Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added fetch_size constructor param #77

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Unreleased
``dependson`` is now a list instead of a filter iterator. This fixes issue #72
where dependencies were only loaded in the first bulk load.

**Changed**
``SQLSource`` now has a ``fetch_size`` constructor parameter so the end-user can
control how much data should be held in main memory for each round trip to the RDBMS.

Version 2.8
-----------
**Added**
Expand Down
15 changes: 13 additions & 2 deletions pygrametl/datasources.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class SQLSource(object):
"""A class for iterating the result set of a single SQL query."""

def __init__(self, connection, query, names=(), initsql=None,
cursorarg=None, parameters=None):
cursorarg=None, parameters=None, fetch_size=500):
"""Arguments:

- connection: the PEP 249 connection to use. NOT a
Expand All @@ -120,6 +120,8 @@ def __init__(self, connection, query, names=(), initsql=None,
the connection's cursor method is called. Default: None.
- parameters: if not None, this sequence or mapping of parameters
will be sent when the query is executed.
- fetch_size: The amount of rows to fetch into memory for each round trip to the source.
All rows will be fetched at once if fetch_size is set to 0 or less.
"""
self.connection = connection
if cursorarg is not None:
Expand All @@ -145,7 +147,11 @@ def __iter__(self):
names = self.names or \
[t[0] for t in self.cursor.description]
while True:
data = self.cursor.fetchmany(500)
if fetch_size <= 0:
data = self.cursor.fetchall()
else:
data = self.cursor.fetchmany(fetch_size)

if not data:
break
if not names:
Expand All @@ -160,6 +166,11 @@ def __iter__(self):
"%d given, %d needed." % (len(names), len(data[0])))
for row in data:
yield dict(zip(names, row))

# It is not well defined in PEP 249 what fetchall will do if called twice
# Therefore it is safest to break the loop if fetchall is used
if fetch_size <= 0:
break
finally:
try:
self.cursor.close()
Expand Down
Loading