Skip to content

Commit f3a69b2

Browse files
committed
[SPARK-53356][PYTHON][DOCS] Small improvements to python data source docs
### What changes were proposed in this pull request? A few small improvements to the Python data source docs: - Added more type annotations - Added a little bit of hierarchy to make the relationships between sections more clear: nested several sections underneath a "Comprehensive Example: Data Source with Batch and Streaming Readers and Writers" section. - A few small wording tweaks. ### Why are the changes needed? I was reading the Python data source docs to learn how to use Python data sources and got confused by a few small things. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? Closes #52101 from sryza/python-data-source-docs. Lead-authored-by: Sandy Ryza <sandyryza@gmail.com> Co-authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Sandy Ryza <sandy.ryza@databricks.com>
1 parent 969a342 commit f3a69b2

File tree

1 file changed

+85
-51
lines changed

1 file changed

+85
-51
lines changed

python/docs/source/tutorial/sql/python_data_source.rst

Lines changed: 85 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,18 @@ Overview
2626
The Python Data Source API is a new feature introduced in Spark 4.0, enabling developers to read from custom data sources and write to custom data sinks in Python.
2727
This guide provides a comprehensive overview of the API and instructions on how to create, use, and manage Python data sources.
2828

29-
Simple Example
30-
--------------
29+
Simple Example: Data Source with Batch Reader
30+
---------------------------------------------
3131
Here's a simple Python data source that generates exactly two rows of synthetic data.
3232
This example demonstrates how to set up a custom data source without using external libraries, focusing on the essentials needed to get it up and running quickly.
3333

3434
**Step 1: Define the data source**
3535

3636
.. code-block:: python
3737
38-
from pyspark.sql.datasource import DataSource, DataSourceReader
38+
from typing import Iterator, Tuple
39+
40+
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
3941
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
4042
4143
class SimpleDataSource(DataSource):
@@ -44,21 +46,21 @@ This example demonstrates how to set up a custom data source without using exter
4446
"""
4547
4648
@classmethod
47-
def name(cls):
49+
def name(cls) -> str:
4850
return "simple"
4951
50-
def schema(self):
52+
def schema(self) -> StructType:
5153
return StructType([
5254
StructField("name", StringType()),
5355
StructField("age", IntegerType())
5456
])
5557
56-
def reader(self, schema: StructType):
58+
def reader(self, schema: StructType) -> DataSourceReader:
5759
return SimpleDataSourceReader()
5860
5961
class SimpleDataSourceReader(DataSourceReader):
6062
61-
def read(self, partition):
63+
def read(self, partition: InputPartition) -> Iterator[Tuple]:
6264
yield ("Alice", 20)
6365
yield ("Bob", 30)
6466
@@ -86,17 +88,18 @@ This example demonstrates how to set up a custom data source without using exter
8688
# +-----+---+
8789
8890
89-
Creating a Python Data Source
90-
-----------------------------
91-
To create a custom Python data source, you'll need to subclass the :class:`DataSource` base classes and implement the necessary methods for reading and writing data.
91+
Comprehensive Example: Data Source with Batch and Streaming Readers and Writers
92+
-------------------------------------------------------------------------------
93+
To create a custom Python data source, you'll need to subclass the :class:`DataSource` base class and implement the necessary methods for reading and writing data.
9294

9395
This example demonstrates creating a simple data source to generate synthetic data using the `faker` library. Ensure the `faker` library is installed and accessible in your Python environment.
9496

95-
**Define the Data Source**
97+
Define the Data Source
98+
~~~~~~~~~~~~~~~~~~~~~~
9699

97-
Start by creating a new subclass of :class:`DataSource` with the source name, schema.
100+
Start by creating a new subclass of :class:`DataSource` with the source name and schema.
98101

99-
In order to be used as source or sink in batch or streaming query, corresponding method of DataSource needs to be implemented.
102+
In order to be used as source or sink in batch or streaming query, corresponding methods of DataSource needs to be implemented.
100103

101104
Method that needs to be implemented for a capability:
102105

@@ -112,7 +115,15 @@ Method that needs to be implemented for a capability:
112115

113116
.. code-block:: python
114117
115-
from pyspark.sql.datasource import DataSource, DataSourceReader
118+
from typing import Union
119+
120+
from pyspark.sql.datasource import (
121+
DataSource,
122+
DataSourceReader,
123+
DataSourceStreamReader,
124+
DataSourceStreamWriter,
125+
DataSourceWriter
126+
)
116127
from pyspark.sql.types import StructType
117128
118129
class FakeDataSource(DataSource):
@@ -123,39 +134,36 @@ Method that needs to be implemented for a capability:
123134
"""
124135
125136
@classmethod
126-
def name(cls):
137+
def name(cls) -> str:
127138
return "fake"
128139
129-
def schema(self):
140+
def schema(self) -> Union[StructType, str]:
130141
return "name string, date string, zipcode string, state string"
131142
132-
def reader(self, schema: StructType):
143+
def reader(self, schema: StructType) -> DataSourceReader:
133144
return FakeDataSourceReader(schema, self.options)
134145
135-
def writer(self, schema: StructType, overwrite: bool):
146+
def writer(self, schema: StructType, overwrite: bool) -> DataSourceWriter:
136147
return FakeDataSourceWriter(self.options)
137148
138-
def streamReader(self, schema: StructType):
149+
def streamReader(self, schema: StructType) -> DataSourceStreamReader:
139150
return FakeStreamReader(schema, self.options)
140151
141-
# Please skip the implementation of this method if streamReader has been implemented.
142-
def simpleStreamReader(self, schema: StructType):
143-
return SimpleStreamReader()
144-
145-
def streamWriter(self, schema: StructType, overwrite: bool):
152+
def streamWriter(self, schema: StructType, overwrite: bool) -> DataSourceStreamWriter:
146153
return FakeStreamWriter(self.options)
147154
148-
Implementing Batch Reader and Writer for Python Data Source
149-
-----------------------------------------------------------
150-
**Implement the Reader**
155+
Implement a Batch Reader
156+
~~~~~~~~~~~~~~~~~~~~~~~~
151157

152158
Define the reader logic to generate synthetic data. Use the `faker` library to populate each field in the schema.
153159

154160
.. code-block:: python
155161
162+
from typing import Dict
163+
156164
class FakeDataSourceReader(DataSourceReader):
157165
158-
def __init__(self, schema, options):
166+
def __init__(self, schema: StructType, options: Dict[str, str]):
159167
self.schema: StructType = schema
160168
self.options = options
161169
@@ -171,10 +179,10 @@ Define the reader logic to generate synthetic data. Use the `faker` library to p
171179
row.append(value)
172180
yield tuple(row)
173181
174-
**Implement the Writer**
182+
Implement a Batch Writer
183+
~~~~~~~~~~~~~~~~~~~~~~~~
175184

176-
Create a fake data source writer that processes each partition of data, counts the rows, and either
177-
prints the total count of rows after a successful write or the number of failed tasks if the writing process fails.
185+
Create a fake data source writer that processes each partition of data, counts the rows, and either prints the total count of rows after a successful write or the number of failed tasks if the writing process fails.
178186

179187
.. code-block:: python
180188
@@ -208,16 +216,15 @@ prints the total count of rows after a successful write or the number of failed
208216
print(f"Number of failed tasks: {failed_count}")
209217
210218
211-
Implementing Streaming Reader and Writer for Python Data Source
212-
---------------------------------------------------------------
213-
**Implement the Stream Reader**
219+
Implement a Streaming Reader
220+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
214221

215222
This is a dummy streaming data reader that generate 2 rows in every microbatch. The streamReader instance has a integer offset that increase by 2 in every microbatch.
216223

217224
.. code-block:: python
218225
219226
class RangePartition(InputPartition):
220-
def __init__(self, start, end):
227+
def __init__(self, start: int, end: int):
221228
self.start = start
222229
self.end = end
223230
@@ -238,14 +245,14 @@ This is a dummy streaming data reader that generate 2 rows in every microbatch.
238245
self.current += 2
239246
return {"offset": self.current}
240247
241-
def partitions(self, start: dict, end: dict):
248+
def partitions(self, start: dict, end: dict) -> list[InputPartition]:
242249
"""
243250
Plans the partitioning of the current microbatch defined by start and end offset,
244251
it needs to return a sequence of :class:`InputPartition` object.
245252
"""
246253
return [RangePartition(start["offset"], end["offset"])]
247254
248-
def commit(self, end: dict):
255+
def commit(self, end: dict) -> None:
249256
"""
250257
This is invoked when the query has finished processing data before end offset, this can be used to clean up resource.
251258
"""
@@ -259,24 +266,44 @@ This is a dummy streaming data reader that generate 2 rows in every microbatch.
259266
for i in range(start, end):
260267
yield (i, str(i))
261268
262-
**Implement the Simple Stream Reader**
269+
Alternative: Implement a Simple Streaming Reader
270+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
263271

264272
If the data source has low throughput and doesn't require partitioning, you can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.
265273

266-
One of simpleStreamReader() and streamReader() must be implemented for readable streaming data source. And simpleStreamReader() will only be invoked when streamReader() is not implemented.
274+
One of simpleStreamReader() and streamReader() must be implemented for a readable streaming data source. And simpleStreamReader() will only be invoked when streamReader() is not implemented.
275+
276+
.. code-block:: python
277+
278+
from pyspark.sql.datasource import SimpleDataSourceStreamReader
279+
280+
281+
class FakeDataSource(DataSource):
282+
...
283+
284+
def simpleStreamReader(self, schema: StructType) -> SimpleDataSourceStreamReader:
285+
return FakeSimpleStreamReader()
286+
287+
# omit implementation of streamReader
288+
289+
...
267290
268291
This is the same dummy streaming reader that generate 2 rows every batch implemented with SimpleDataSourceStreamReader interface.
269292

270293
.. code-block:: python
271294
272-
class SimpleStreamReader(SimpleDataSourceStreamReader):
273-
def initialOffset(self):
295+
from typing import Iterator, Tuple
296+
297+
from pyspark.sql.datasource import SimpleDataSourceStreamReader
298+
299+
class FakeSimpleStreamReader(SimpleDataSourceStreamReader):
300+
def initialOffset(self) -> dict:
274301
"""
275302
Return the initial start offset of the reader.
276303
"""
277304
return {"offset": 0}
278305
279-
def read(self, start: dict) -> (Iterator[Tuple], dict):
306+
def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
280307
"""
281308
Takes start offset as an input, return an iterator of tuples and the start offset of next read.
282309
"""
@@ -293,18 +320,25 @@ This is the same dummy streaming reader that generate 2 rows every batch impleme
293320
end_idx = end["offset"]
294321
return iter([(i,) for i in range(start_idx, end_idx)])
295322
296-
def commit(self, end):
323+
def commit(self, end: dict) -> None:
297324
"""
298325
This is invoked when the query has finished processing data before end offset, this can be used to clean up resource.
299326
"""
300327
pass
301328
302-
**Implement the Stream Writer**
329+
Implement a Streaming Writer
330+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
303331

304332
This is a streaming data writer that write the metadata information of each microbatch to a local path.
305333

306334
.. code-block:: python
307335
336+
from typing import Iterator, List, Optional
337+
338+
from pyspark.sql import Row
339+
from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage
340+
341+
308342
class SimpleCommitMessage(WriterCommitMessage):
309343
partition_id: int
310344
count: int
@@ -315,7 +349,7 @@ This is a streaming data writer that write the metadata information of each micr
315349
self.path = self.options.get("path")
316350
assert self.path is not None
317351
318-
def write(self, iterator):
352+
def write(self, iterator: Iterator[Row]) -> WriterCommitMessage:
319353
"""
320354
Write the data and return the commit message of that partition
321355
"""
@@ -327,18 +361,18 @@ This is a streaming data writer that write the metadata information of each micr
327361
cnt += 1
328362
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
329363
330-
def commit(self, messages, batchId) -> None:
364+
def commit(self, messages: List[Optional[SimpleCommitMessage]], batchId: int) -> None:
331365
"""
332-
Receives a sequence of :class:`WriterCommitMessage` when all write tasks succeed and decides what to do with it.
366+
Receives a sequence of :class:`SimpleCommitMessage` when all write tasks succeed and decides what to do with it.
333367
In this FakeStreamWriter, we write the metadata of the microbatch(number of rows and partitions) into a json file inside commit().
334368
"""
335369
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
336370
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
337371
file.write(json.dumps(status) + "\n")
338372
339-
def abort(self, messages, batchId) -> None:
373+
def abort(self, messages: List[Optional[SimpleCommitMessage]], batchId: int) -> None:
340374
"""
341-
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some tasks fail and decides what to do with it.
375+
Receives a sequence of :class:`SimpleCommitMessage` from successful tasks when some tasks fail and decides what to do with it.
342376
In this FakeStreamWriter, we write a failure message into a txt file inside abort().
343377
"""
344378
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
@@ -348,7 +382,7 @@ Serialization Requirement
348382
-------------------------
349383
User defined DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader and DataSourceStreamWriter and their methods must be able to be serialized by pickle.
350384

351-
For library that are used inside a method, it must be imported inside the method. For example, TaskContext must be imported inside the read() method in the code below.
385+
For libraries that are used inside a method, they must be imported inside the method. For example, TaskContext must be imported inside the read() method in the code below.
352386

353387
.. code-block:: python
354388
@@ -358,7 +392,7 @@ For library that are used inside a method, it must be imported inside the method
358392
359393
Using a Python Data Source
360394
--------------------------
361-
**Use a Python Data Source in Batch Query**
395+
**Register a Python Data Source**
362396

363397
After defining your data source, it must be registered before usage.
364398

0 commit comments

Comments
 (0)