Speeding up ETLHelper’s API transfers with asyncio
ETLHelper is a Python library developed at BGS for reading from and writing to databases. It makes it easy to run a SQL query and transform the results into JSON objects suitable for uploading to an HTTP API.
In some of our projects, we have begun to use Python’s asynchronous capabilities to perform concurrent API uploads. As a result, we have seen speed increases of up to 10x compared to our previous method of posting data sequentially.
This post compares the two methods and shows how simple the asyncio
code can
be.
Comparison of sequential and concurrent API upload code
The code below is taken from the Recipes section of the ETLHelper documentation, which includes an example ETL script to transfer data from an Oracle database to an ElasticSearch API. The script originally described posting items sequentially using the Requests library and has now been updated to use concurrent processing with the aiohttp library.
Only the most relevant parts of code are shown here - see the ETLHelper documentation for full details.
Sequential posting with Requests
ETLHelper provides the iter_rows
function that returns
a generator item
that yields a single result from the database with each iteration.
Results are fetched from the database only as required instead of all being loaded into
memory first.
This makes it suitable for transferring large quantities of data.
In this case, the iter_rows
call runs a SQL query (SELECT_SENSORS
) against
the database and applies a transform function (transform_sensors
) to convert
the result into a Python dictionary that can be easily converted to JSON. The
resulting item is posted to the API.
def copy_sensors(startdate, enddate):
"""Read sensors from Oracle and post to REST API."""
with ORACLE_DB.connect('ORACLE_PASSWORD') as conn:
# Iterate over rows individually
for item in iter_rows(SELECT_SENSORS, conn,
parameters={"startdate": startdate,
"enddate": enddate},
transform=transform_sensors):
post_item(item)
Requests is an HTTP library for Python that can be used to communicate with APIs.
The post_item
function uses Requests to post the item.
It also raises an exception if something goes wrong.
def post_item(item):
"""Post a single item to API."""
# Post data to API
response = requests.post(BASE_URL + 'sensors/_doc', headers=HEADERS,
data=json.dumps(item))
# Check for failed rows
if response.status_code >= 400:
logger.error('The following item failed: %s\nError message:\n(%s)',
item, response.text)
response.raise_for_status()
This code is very simple, but it can be slow as the Python interpreter has to wait for a response from the API before it can proceed to the next item.
Concurrent posting with aiohttp and asyncio
The aiohttp library is similar to Requests, but it is based on Python’s asyncio library for asynchronous execution. This provides an efficient way to make multiple concurrent API calls.
In the concurrent version, we use the iter_chunks
to pull the data from the
database.
This returns a generator that yields a list of results (5000 at a time by
default) with each iteration.
The post_chunk
function posts the results in each chunk concurrently.
Because post_chunk
is an asynchronous function, it needs to be called by asyncio.run()
.
Otherwise, the copy_sensors
function is very similar to before.
def copy_sensors(startdate, enddate):
"""Read sensors from Oracle and post to REST API."""
with ORACLE_DB.connect('ORACLE_PASSWORD') as conn:
# iterate over chunks of rows
for chunk in iter_chunks(SELECT_SENSORS, conn,
parameters={"startdate": startdate,
"enddate": enddate},
transform=transform_sensors)
asyncio.run(post_chunk(chunk))
Two functions are required for asynchronously posting to the API - one to post a single item and another call the first concurrently for all of our items.
post_chunk
handles the concurrency.
It builds a list of tasks, one for each item, then calls asyncio.gather()
to execute them asynchronously (and collect
the results if required).
An aiohttp.ClientSession
allows the same connection to the server to be reused for each item in the chunk.
async def post_chunk(chunk):
"""Post multiple items to API asynchronously."""
async with aiohttp.ClientSession() as session:
# Build list of tasks
tasks = []
for item in chunk:
tasks.append(post_item(item, session))
# Process tasks in parallel. An exception in any will be raised.
await asyncio.gather(*tasks)
The post_item
function is similar to the Requests version.
The main differences are the use of await keywords and that response.text
is an awaitable function here, whereas it is an attribute in Requests.
Also, it is essential to log any error information before raising an
exception if something goes wrong.
Otherwise, it is not possible to access all the state information for the awaitable response
via a debugger.
async def post_item(item, session):
"""Post a single item to API using existing aiohttp Session."""
# Post the item
response = await session.post(BASE_URL + 'sensors/_doc', headers=HEADERS,
data=json.dumps(item))
# Log responses before throwing errors because error info is not included
# in generated Exceptions and so cannot otherwise be seen for debugging.
if response.status >= 400:
response_text = await response.text()
logger.error('The following item failed: %s\nError message:\n(%s)',
item, response_text)
await response.raise_for_status()
Conclusion
Asynchronous frameworks based on asyncio
library are relatively new to Python (since Python 3.4) and have
a reputation of being difficult to understand.
Hopefully this blog post demonstrates how aiohttp
can be used to make concurrent API requests and how well it
integrates with ETLHelper’s iter_chunks
function.
If you would like to take a deeper look at asyncio
, I recommend reading
Real Python’s Async IO in Python
tutorial.
comments