Skip to content

Commit 7a7b84f

Browse files
authored
Call stream API from separate thread when processing queues (#6)
* Call stream API from separate thread when processing queues * Fix flake8 warning
1 parent c85449d commit 7a7b84f

File tree

5 files changed

+29
-9
lines changed

5 files changed

+29
-9
lines changed

target_datadotworld/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@
2121

2222
import singer
2323

24-
__version__ = '1.0.0b1'
24+
__version__ = '1.0.0b2'
2525

2626
logger = copy(singer.get_logger()) # copy needed in order to set level

target_datadotworld/api_client.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616
#
1717
# This product includes software developed at
1818
# data.world, Inc.(http://data.world/).
19-
19+
import functools
2020
import gzip
21+
from concurrent.futures import ThreadPoolExecutor
2122
from time import sleep
2223

2324
import backoff
@@ -46,6 +47,7 @@ def __init__(self, api_token, **kwargs):
4647
self._api_url = kwargs.get('api_url', 'https://api.data.world/v0')
4748
self._conn_timeout = kwargs.get('connect_timeout', 3.05)
4849
self._read_timeout = kwargs.get('read_timeout', 600)
50+
self._max_threads = kwargs.get('max_threads', 10)
4951

5052
self._session = requests.Session()
5153
default_headers = {
@@ -58,6 +60,11 @@ def __init__(self, api_token, **kwargs):
5860
self._session.mount(self._api_url,
5961
BackoffAdapter(GzipAdapter(HTTPAdapter())))
6062

63+
# Create a limited thread pool.
64+
self._executor = ThreadPoolExecutor(
65+
max_workers=self._max_threads
66+
)
67+
6168
def connection_check(self):
6269
"""Verify network connectivity
6370
@@ -100,7 +107,7 @@ def append_stream(self, owner, dataset, stream, records):
100107
raise convert_requests_exception(e)
101108

102109
async def append_stream_chunked(
103-
self, owner, dataset, stream, queue, chunk_size):
110+
self, owner, dataset, stream, queue, chunk_size, loop):
104111
"""Asynchronously append records to a stream in a data.world dataset
105112
106113
:param owner: User or organization ID of the owner of the dataset
@@ -122,20 +129,33 @@ async def append_stream_chunked(
122129

123130
delayed_exception = None
124131
# noinspection PyTypeChecker
132+
pending_task = None
125133
async for chunk in to_chunks(queue, chunk_size):
126134
if delayed_exception is None:
127135
try:
128136
logger.info('Uploading {} records in batch #{} '
129137
'from {} stream '.format(
130138
len(chunk), counter.value, stream))
131-
# TODO Invoke append_stream in a separate thread
132-
self.append_stream(owner, dataset, stream, chunk)
139+
140+
if pending_task is not None:
141+
# Force chunks to be appended sequentially
142+
await pending_task
143+
144+
# Call API on separate thread
145+
pending_task = loop.run_in_executor(
146+
self._executor,
147+
functools.partial(self.append_stream,
148+
owner, dataset, stream, chunk)
149+
)
133150
counter.increment()
134151
except Exception as e:
135152
delayed_exception = e
136153
else:
137154
pass # Must exhaust queue
138155

156+
if pending_task is not None:
157+
await pending_task
158+
139159
if delayed_exception is not None:
140160
raise delayed_exception
141161

target_datadotworld/target.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ async def process_lines(self, lines, loop=None):
156156
self.config['dataset_id'],
157157
to_stream_id(msg.stream),
158158
queue,
159-
self._batch_size), loop=loop)
159+
self._batch_size, loop=loop), loop=loop)
160160

161161
# Add record to queue
162162
await queues[msg.stream].put(msg.record)

tests/target_datadotworld/test_api_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def verify_body_and_count(req):
9797

9898
consumer = asyncio.ensure_future(client.append_stream_chunked(
9999
'owner', 'dataset', 'stream', queue,
100-
chunk_size=chunk_size), loop=event_loop)
100+
chunk_size=chunk_size, loop=event_loop), loop=event_loop)
101101
await queue.join()
102102
await consumer
103103

@@ -120,7 +120,7 @@ async def test_append_stream_chunked_error(
120120
with pytest.raises(dwex.ApiError):
121121
consumer = asyncio.ensure_future(client.append_stream_chunked(
122122
'owner', 'dataset', 'stream', queue,
123-
chunk_size=chunk_size), loop=event_loop)
123+
chunk_size=chunk_size, loop=event_loop), loop=event_loop)
124124
await queue.join()
125125
await consumer
126126

tests/target_datadotworld/test_target.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def create_dataset(self, owner, dataset, **kwargs):
4444
return {}
4545

4646
async def append_stream_chunked(
47-
self, owner, dataset, stream, queue, chunk_size):
47+
self, owner, dataset, stream, queue, chunk_size, loop):
4848
while True:
4949
item = await queue.get()
5050
queue.task_done()

0 commit comments

Comments
 (0)