Skip to content

httpx run in parrallel

Overview

How do I make sure httpx calls are run in parallel? Asked 2 years, 5 months ago Modified 2 years, 4 months ago Viewed 4k times

Report this ad 2

I was recommended httpx as a way to perform api requests in parallel, with a nice api like requests.

my code

import asyncio import time

import httpx

async def main(): t0 = time.time()

usernames = [
"author",
"abtinf",
"TheCoelacanth",
"tomcam",
"chauhankiran",
"ulizzle",
"ulizzle",
"ulizzle",
"cratermoon",
"Aeolun",
"ulizzle",
"firexcy",
"kazinator",
"blacksoil",
"lucakiebel",
"ozim",
"tomcam",
"jstummbillig",
"tomcam",
"johnchristopher",
"Tade0",
"lallysingh",
"paulddraper",
"WilTimSon",
"gumby",
"kristopolous",
"zemo",
"aschearer",
"why-el",
"Osiris",
"mdaniel",
"ianbutler",
"vinaypai",
"samtho",
"chazeon",
"taeric",
"yellowapple",
"Kye",
]

bios = []

headers = {"User-Agent": "curl/7.72.0"}
async with httpx.AsyncClient(headers=headers) as client:
    for username in usernames:
        url = f"https://hn.algolia.com/api/v1/users/{username}"
        response = await client.get(url)
        data = response.json()
        bios.append(data['about'])
        print('.')

t1 = time.time()
total = t1-t0
print(bios)
print(f"Total time: {total} seconds") # 11 seconds async

asyncio.run(main()) How do I make sure that this example runs with the requests in parallel?

python-asynciohttpx Share Improve this question Follow asked May 11, 2023 at 23:24 Harry Moreno's user avatar Harry Moreno 11.9k88 gold badges7575 silver badges120120 bronze badges 2 You can't make sure that the requests are running in parallel, because there is no possibility that they are. Your script has one Task (main()) and doesn't create any extra threads or Processes. The one and only Task will await each transaction (await client.get(url)) before proceeding to the next transaction. No parallel processing, multithreading or multitasking can happen. This Task can potentially multitask with another Task, which will proceed while main() is awaiting. – Paul Cornelius CommentedMay 11, 2023 at 23:45 Look like a simple fix, just create task per request(Which might ban you for ddos-ing on some servers) or go for producer-consumer pattern and create fixed numbers of consumer tasks with a queue. Will answer in bit more detail after few hours. – jupiterbjy CommentedMay 12, 2023 at 8:37 Add a comment 3 Answers Sorted by:

Highest score (default) 3

As Daniil answered in great detail - I'll just add a simple illustration about how IO works are like, and an alternative design choice.

About how IO works As Daniil said, asyncio does not provide parallelism, but provides concurrency.

But - We can achieve IO parallelism in python because python actually doesn't do any of IO works itself (Nor pretty much every user programs). OS does. All Python does meanwhile is doing nothing.

And even for CPU, it's not their job to consistently polling every device if IO is done or not - each individual devices send signals(Interrupts) to CPU, then CPU finally starts checking which device's IO work is done.

So, in process/thread's perspective - IO is more like this:

"Hey OS, please do this IO works for me. Wake me up when it's done." Thread 1 goes to sleep

Some time later, OS punches Thread 1 "Your IO Operation is done, take this and get back to work."

OS does IO works for you, and punches you out of the sleep for you, too - which is called Interrupt.

This is why you see in many applications & frameworks(including asyncio) uses Threading to improve throughput in python despite the existence of Global Interpreter Lock(GIL) limiting python code to be ran in only 1 thread at any given time.

That is, despite being limited in parallel execution - python low-level IO codes written in C releases GIL when waiting for OS do it's IO works so other threads' python code can do more useful job than itself.

So, TL;DR script itself is not parallel, but IO can be parallel - all networks jobs are sent (Despite not simultaneously) - and is waiting for server's response simultaneously(Which is doing nothing until OS Interrupt).

Some example And for producer-consumer pattern-ish example - well, more like pool because there is no pair. Servers will usually ban/cut the connection when there's a lot of simultaneous connections.

But with this approach - we can guarantee that there will only be at best 3 simultaneous connections and won't make server angry.

server.py - receive GET, randomly waits and response:

import asyncio from random import randint

from quart import request, jsonify, Quart

app = Quart("Very named Much app")

@app.get("/json") async def send_json(): """ Sleeps 0~4 seconds before returning response.

Returns:
    json response
"""
key = request.args["user"]
print("Received " + key)

await asyncio.sleep(randint(0, 4))
return jsonify({"user": key})

asyncio.run(app.run_task()) client.py:

import asyncio

import httpx

async def request*task(id*, in_queue: asyncio.Queue, out_queue: asyncio.Queue): """Get json response data from url in queue. It's Consumer and also Producer.

Args:
    id_: task ID
    in_queue: Queue for receiving url
    out_queue: Queue for returning data
"""
print(f"[Req. Task {id_}] Started!")

# create context for each task
async with httpx.AsyncClient() as client:
    while True:
        user = await in_queue.get()
        print(f"[Req. Task {id_}] Processing user '{user}'")

        data = await client.get("http://127.0.0.1:5000/json?user=" + str(user))

        # do what you want here
        print(f"[Req. Task {id_}] Received {data}")
        await out_queue.put(data)

        # inform queue that we are done with data we took
        in_queue.task_done()

async def main(): """ Starter code """

# create queues
in_queue = asyncio.Queue()
out_queue = asyncio.Queue()

# create consumer tasks
pool = [asyncio.create_task(request_task(n, in_queue, out_queue)) for n in range(3)]

# populate queue with numbers as user's name
for n in range(30):
    in_queue.put_nowait(n)

# wait for enqueued works are complete
await in_queue.join()

# cancel tasks
for task in pool:
    task.cancel()

# check data
print(f"[Main task] Processed {out_queue.qsize()} data!")

if name == 'main': asyncio.run(main()) output:

[Req. Task 0] Started! [Req. Task 0] Processing user '0' [Req. Task 1] Started! [Req. Task 1] Processing user '1' [Req. Task 2] Started! [Req. Task 2] Processing user '2' [Req. Task 2] Received [Req. Task 2] Processing user '3' [Req. Task 1] Received [Req. Task 1] Processing user '4' [Req. Task 2] Received [Req. Task 2] Processing user '5' [Req. Task 0] Received [Req. Task 0] Processing user '6' ...

[Req. Task 2] Received [Req. Task 2] Processing user '22' [Req. Task 1] Received [Req. Task 1] Processing user '23' [Req. Task 0] Received [Req. Task 0] Processing user '24' [Req. Task 1] Received [Req. Task 1] Processing user '25' [Req. Task 1] Received [Req. Task 1] Processing user '26' [Req. Task 2] Received [Req. Task 2] Processing user '27' [Req. Task 0] Received [Req. Task 0] Processing user '28' [Req. Task 1] Received [Req. Task 1] Processing user '29' [Req. Task 1] Received [Req. Task 2] Received [Req. Task 0] Received [Main task] Processed 30 data! Share Improve this answer Follow edited May 12, 2023 at 17:35 answered May 12, 2023 at 17:21 jupiterbjy's user avatar jupiterbjy 3,72033 gold badges2020 silver badges3737 bronze badges Sign up to request clarification or add additional context in comments.

Comments

Report this ad 2

First of all, Python's asyncio does not provide true parallelism (as has been discussed repeatedly on this platform). The event loop runs in a single thread.

The concurrency just allows context switches between multiple coroutines, while they are awaiting some I/O operation to finish, like for example an HTTP request. But the requesting function must be implemented in a particular, non-blocking way for this to work. The httpx package apparently provides such functions.

As has been pointed out in the comments, you are not getting any concurrency in your code because you are awaiting each request made by the client sequentially in a for-loop. In other words, there is no chance for a new request to be launched, until the previous one returns completely.

A common pattern to concurrently execute the same coroutine with different arguments is to use asyncio.gather. I would suggest to factor out the entire GET request as well as the retrieval of the about section of the returned data into its own coroutine function and execute whatever number of those you deem appropriate concurrently:

import asyncio import time

import httpx

BASE_URL = "https://hn.algolia.com/api/v1/users"

async def get_bio(username: str, client: httpx.AsyncClient) -> str: response = await client.get(f"{BASE_URL}/{username}") print(".") data = response.json() return data["about"]

async def main() -> None: t0 = time.time() usernames = [ "author", "abtinf", "TheCoelacanth",

...

] headers = {"User-Agent": "curl/7.72.0"} async with httpx.AsyncClient(headers=headers) as client: bios = await asyncio.gather(*(get_bio(user, client) for user in usernames)) print(dict(zip(usernames, bios))) print(f"Total time: {time.time() - t0:.3} seconds")

asyncio.run(main()) Sample output:

. . . {'author': '', 'abtinf': 'You can reach me at abtinf@gmail.com or @abtinf.', 'TheCoelacanth': 'thecoelacanth@gmail.com'} Total time: 0.364 seconds Since this approach allows a great number of HTTP requests to be made in a very short amount of time (because you are not awaiting previous responses before launching more requests), there is always the danger of being subjected to rate limiting or being blocked outright by the API. I don't know anything about this API in particular though. So I don't know if your list of user names is already "too long".

If you are interested in a flexible control mechanism to manage a pool of asynchronous tasks, I wrote the asyncio-taskpool package to make this easier for my own applications. TaskPool.map allows you to set a specific maximum number of tasks to work concurrently on an arbitrary iterable of arguments. This could help with the rate limiting issue.