Skip to content

Commit d46ba60

Browse files
committed
refactor: consolidate net_* files
Signed-off-by: Gabor Boros <gabor.brs@gmail.com>
1 parent 00b9917 commit d46ba60

File tree

8 files changed

+57
-192
lines changed

8 files changed

+57
-192
lines changed

.coveragerc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ omit = *tests*
1515

1616
[report]
1717
sort = cover
18-
fail_under = 65
18+
fail_under = 60
1919
exclude_lines = pragma: no cover
2020
if __name__ == .__main__.:

rethinkdb/net_asyncio.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import ssl
2222
import struct
2323

24-
from rethinkdb import ql2_pb2
2524
from rethinkdb.errors import (
2625
ReqlAuthError,
2726
ReqlCursorEmpty,
@@ -30,16 +29,12 @@
3029
)
3130
from rethinkdb.net import Connection as ConnectionBase
3231
from rethinkdb.net import Cursor, Query, Response, maybe_profile
32+
from rethinkdb.ql2_pb2 import Query as PbQuery
33+
from rethinkdb.ql2_pb2 import Response as PbResponse
3334

3435
__all__ = ["Connection"]
3536

3637

37-
# pylint: disable=invalid-name
38-
PB_RESPONSE = ql2_pb2.Response.ResponseType
39-
PB_QUERY = ql2_pb2.Query.QueryType
40-
# pylint: enable=invalid-name
41-
42-
4338
async def _read_until(streamreader, delimiter):
4439
"""Naive implementation of reading until a delimiter"""
4540
buffer = bytearray()
@@ -292,7 +287,7 @@ async def close(self, noreply_wait=False, token=None, exception=None):
292287
self._cursor_cache = {}
293288

294289
if noreply_wait:
295-
noreply = Query(PB_QUERY.NOREPLY_WAIT, token, None, None)
290+
noreply = Query(PbQuery.QueryType.NOREPLY_WAIT, token, None, None)
296291
await self.run_query(noreply, False)
297292

298293
self._streamwriter.close()
@@ -349,17 +344,17 @@ async def _reader(self):
349344
query
350345
), # pylint: disable=protected-access
351346
)
352-
if res.response_type == PB_RESPONSE.SUCCESS_ATOM:
347+
if res.response_type == PbResponse.ResponseType.SUCCESS_ATOM:
353348
future.set_result(maybe_profile(res.data[0], res))
354349
elif res.response_type in (
355-
PB_RESPONSE.SUCCESS_SEQUENCE,
356-
PB_RESPONSE.SUCCESS_PARTIAL,
350+
PbResponse.ResponseType.SUCCESS_SEQUENCE,
351+
PbResponse.ResponseType.SUCCESS_PARTIAL,
357352
):
358353
cursor = AsyncioCursor(self, query, res)
359354
future.set_result(maybe_profile(cursor, res))
360-
elif res.response_type == PB_RESPONSE.WAIT_COMPLETE:
355+
elif res.response_type == PbResponse.ResponseType.WAIT_COMPLETE:
361356
future.set_result(None)
362-
elif res.response_type == PB_RESPONSE.SERVER_INFO:
357+
elif res.response_type == PbResponse.ResponseType.SERVER_INFO:
363358
future.set_result(res.data[0])
364359
else:
365360
future.set_exception(res.make_error(query))
@@ -391,7 +386,7 @@ async def __aexit__(self, exception_type, exception_val, traceback):
391386

392387
async def _stop(self, cursor):
393388
self.check_open()
394-
q = Query(PB_QUERY.STOP, cursor.query.token, None, None)
389+
q = Query(PbQuery.QueryType.STOP, cursor.query.token, None, None)
395390
return await self._instance.run_query(
396391
q, True
397392
) # pylint: disable=protected-access

rethinkdb/net_common.py

Lines changed: 0 additions & 47 deletions
This file was deleted.

rethinkdb/net_gevent.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,23 @@
2525
from gevent.event import AsyncResult, Event # type: ignore[import-untyped]
2626
from gevent.lock import Semaphore # type: ignore[import-untyped]
2727

28-
from rethinkdb import net, ql2_pb2
2928
from rethinkdb.errors import (
3029
ReqlAuthError,
3130
ReqlCursorEmpty,
3231
ReqlDriverError,
3332
ReqlTimeoutError,
3433
)
34+
from rethinkdb.net import Connection as ConnectionBase
35+
from rethinkdb.net import Cursor, Query, Response
36+
from rethinkdb.net import SocketWrapper as SocketWrapperBase
37+
from rethinkdb.net import maybe_profile
38+
from rethinkdb.ql2_pb2 import Query as PbQuery
39+
from rethinkdb.ql2_pb2 import Response as PbResponse
3540

3641
__all__ = ["Connection"]
3742

38-
# pylint: disable=invalid-name
39-
PB_RESPONSE = ql2_pb2.Response.ResponseType
40-
PB_QUERY = ql2_pb2.Query.QueryType
41-
# pylint: enable=invalid-name
4243

43-
44-
class GeventCursor(net.Cursor):
44+
class GeventCursor(Cursor):
4545
"""Gevent-based cursor implementation for RethinkDB queries."""
4646

4747
def __init__(self, *args, **kwargs):
@@ -77,7 +77,7 @@ def _get_next(self, timeout): # pylint: disable=signature-differs
7777
return self.items.popleft()
7878

7979

80-
class SocketWrapper(net.SocketWrapper):
80+
class SocketWrapper(SocketWrapperBase):
8181
"""Gevent-based socket wrapper for RethinkDB connections."""
8282

8383
# pylint: disable=super-init-not-called,too-many-branches,too-many-statements
@@ -267,7 +267,7 @@ def close(self, noreply_wait=False, token=None, exception=None):
267267
self._cursor_cache = {}
268268

269269
if noreply_wait:
270-
noreply = net.Query(PB_QUERY.NOREPLY_WAIT, token, None, None)
270+
noreply = Query(PbQuery.QueryType.NOREPLY_WAIT, token, None, None)
271271
self.run_query(noreply, False)
272272

273273
try:
@@ -311,16 +311,16 @@ def _reader(self):
311311
# Do not pop the query from the dict until later, so
312312
# we don't lose track of it in case of an exception
313313
query, async_res = self._user_queries[token]
314-
res = net.Response(token, buf, self._parent.get_json_decoder(query))
315-
if res.response_type == PB_RESPONSE.SUCCESS_ATOM:
316-
async_res.set(net.maybe_profile(res.data[0], res))
314+
res = Response(token, buf, self._parent.get_json_decoder(query))
315+
if res.response_type == PbResponse.ResponseType.SUCCESS_ATOM:
316+
async_res.set(maybe_profile(res.data[0], res))
317317
elif res.response_type in (
318-
PB_RESPONSE.SUCCESS_SEQUENCE,
319-
PB_RESPONSE.SUCCESS_PARTIAL,
318+
PbResponse.ResponseType.SUCCESS_SEQUENCE,
319+
PbResponse.ResponseType.SUCCESS_PARTIAL,
320320
):
321321
cursor = GeventCursor(self, query, res)
322-
async_res.set(net.maybe_profile(cursor, res))
323-
elif res.response_type == PB_RESPONSE.WAIT_COMPLETE:
322+
async_res.set(maybe_profile(cursor, res))
323+
elif res.response_type == PbResponse.ResponseType.WAIT_COMPLETE:
324324
async_res.set(None)
325325
else:
326326
async_res.set_exception(res.make_error(query))
@@ -332,7 +332,7 @@ def _reader(self):
332332
self.close(exception=exc)
333333

334334

335-
class Connection(net.Connection):
335+
class Connection(ConnectionBase):
336336
"""Gevent-based RethinkDB connection."""
337337

338338
def __init__(self, *args, **kwargs):

rethinkdb/net_tornado.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,16 @@
2525
from tornado.ioloop import IOLoop
2626
from tornado.tcpclient import TCPClient
2727

28-
from rethinkdb.net_common import (
29-
ConnectionBase,
30-
Cursor,
31-
PbQuery,
32-
PbResponse,
33-
Query,
28+
from rethinkdb.errors import (
3429
ReqlAuthError,
3530
ReqlCursorEmpty,
3631
ReqlDriverError,
3732
ReqlTimeoutError,
38-
Response,
39-
maybe_profile,
4033
)
34+
from rethinkdb.net import Connection as ConnectionBase
35+
from rethinkdb.net import Cursor, Query, Response, maybe_profile
36+
from rethinkdb.ql2_pb2 import Query as PbQuery
37+
from rethinkdb.ql2_pb2 import Response as PbResponse
4138

4239
__all__ = ["Connection"]
4340

@@ -262,7 +259,7 @@ async def close(
262259
self._cursor_cache = {}
263260

264261
if noreply_wait and token is not None:
265-
noreply = Query(PbQuery.NOREPLY_WAIT, token, None, None)
262+
noreply = Query(PbQuery.QueryType.NOREPLY_WAIT, token, None, None)
266263
await self.run_query(noreply, False)
267264

268265
if self._stream is not None:
@@ -333,17 +330,17 @@ async def _handle_response(self, token: int, buf: bytes) -> None:
333330
query, future = self._user_queries[token]
334331
res = Response(token, buf, self._parent.get_json_decoder(query))
335332

336-
if res.response_type == PbResponse.SUCCESS_ATOM:
333+
if res.response_type == PbResponse.ResponseType.SUCCESS_ATOM:
337334
future.set_result(maybe_profile(res.data[0], res))
338335
elif res.response_type in (
339-
PbResponse.SUCCESS_SEQUENCE,
340-
PbResponse.SUCCESS_PARTIAL,
336+
PbResponse.ResponseType.SUCCESS_SEQUENCE,
337+
PbResponse.ResponseType.SUCCESS_PARTIAL,
341338
):
342339
cursor = TornadoCursor(self, query, res)
343340
future.set_result(maybe_profile(cast(Any, cursor), res))
344-
elif res.response_type == PbResponse.WAIT_COMPLETE:
341+
elif res.response_type == PbResponse.ResponseType.WAIT_COMPLETE:
345342
future.set_result(None)
346-
elif res.response_type == PbResponse.SERVER_INFO:
343+
elif res.response_type == PbResponse.ResponseType.SERVER_INFO:
347344
future.set_result(res.data[0])
348345
else:
349346
future.set_exception(res.make_error(query))

rethinkdb/net_twisted.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,16 @@
3030
from twisted.internet.error import TimeoutError as TwistedTimeoutError
3131
from twisted.internet.protocol import ClientFactory, Protocol
3232

33-
from rethinkdb.net_common import (
34-
ConnectionBase,
35-
Cursor,
36-
PbQuery,
37-
PbResponse,
38-
Query,
33+
from rethinkdb.errors import (
3934
ReqlAuthError,
4035
ReqlCursorEmpty,
4136
ReqlDriverError,
4237
ReqlTimeoutError,
43-
Response,
44-
maybe_profile,
4538
)
39+
from rethinkdb.net import Connection as ConnectionBase
40+
from rethinkdb.net import Cursor, Query, Response, maybe_profile
41+
from rethinkdb.ql2_pb2 import Query as PbQuery
42+
from rethinkdb.ql2_pb2 import Response as PbResponse
4643

4744
__all__ = ["Connection"]
4845

@@ -324,17 +321,17 @@ def _handle_response(self, token: int, data: bytes) -> None:
324321
elif token in self._user_queries:
325322
query, deferred = self._user_queries[token]
326323
res = Response(token, data, self._parent.get_json_decoder(query))
327-
if res.response_type == PbResponse.SUCCESS_ATOM:
324+
if res.response_type == PbResponse.ResponseType.SUCCESS_ATOM:
328325
deferred.callback(maybe_profile(res.data[0], res))
329326
elif res.response_type in (
330-
PbResponse.SUCCESS_SEQUENCE,
331-
PbResponse.SUCCESS_PARTIAL,
327+
PbResponse.ResponseType.SUCCESS_SEQUENCE,
328+
PbResponse.ResponseType.SUCCESS_PARTIAL,
332329
):
333330
cursor = TwistedCursor(self, query, res)
334331
deferred.callback(maybe_profile(cast(Any, cursor), res))
335-
elif res.response_type == PbResponse.WAIT_COMPLETE:
332+
elif res.response_type == PbResponse.ResponseType.WAIT_COMPLETE:
336333
deferred.callback(None)
337-
elif res.response_type == PbResponse.SERVER_INFO:
334+
elif res.response_type == PbResponse.ResponseType.SERVER_INFO:
338335
deferred.callback(res.data[0])
339336
else:
340337
deferred.errback(res.make_error(query))
@@ -443,7 +440,7 @@ def close(
443440
self.reset_cursor_cache()
444441

445442
if noreply_wait and token is not None:
446-
noreply = Query(PbQuery.NOREPLY_WAIT, token, None, None)
443+
noreply = Query(PbQuery.QueryType.NOREPLY_WAIT, token, None, None)
447444
d = self.run_query(noreply, False)
448445

449446
def close_connection(res: Any) -> Any:

0 commit comments

Comments
 (0)