Skip to content

Exit for multiple processing

Overview

_threading_atexits = []
_SHUTTING_DOWN = False

def _register_atexit(func, *arg, **kwargs):
    """CPython internal: register *func* to be called before joining threads.

    The registered *func* is called with its arguments just before all
    non-daemon threads are joined in `_shutdown()`. It provides a similar
    purpose to `atexit.register()`, but its functions are called prior to
    threading shutdown instead of interpreter shutdown.

    For similarity to atexit, the registered functions are called in reverse.
    """
    if _SHUTTING_DOWN:
        raise RuntimeError("can't register atexit after shutdown")

    call = functools.partial(func, *arg, **kwargs)
    _threading_atexits.append(call)

More context https://bugs.python.org/issue41962

Messages (10) msg378144 - (view) Author: Ben Darnell (Ben.Darnell) * Date: 2020-10-07 01:09 I'm dealing with a subtle deadlock involving concurrent.futures.ThreadPoolExecutor, and my solution that worked in Python 3.8 broke with 3.9. I'm running some long-running (possibly infinite) tasks in the thread pool, and I cancel them in an atexit callback so that everything can shut down cleanly (before ThreadPoolExecutor joins all worker threads in its own atexit hook).

Python 3.9 broke this due to https://bugs.python.org/issue39812. That change introduced a new atexit-like mechanism to the threading module and uses it where Python 3.8 used regular atexit. This means that ThreadPoolExecutor's atexit runs before mine, and since I never get a chance to cancel my tasks, it deadlocks.

One way I can solve this is to move my own atexit function to threading._register_atexit, so my strawman proposal here is to make that function public and documented.

On the other hand, even without the change in Python 3.9, my use of atexit smells like an abuse of implementation details in ThreadPoolExecutor (getting the atexit callbacks called in the right order was tricky when the concurrent.futures module started using lazy loading in Python 3.7). So I would welcome other suggestions about how to handle long-running but cancelable operations in a ThreadPoolExecutor at shutdown.

One clean solution is to do the cancellation at the end of the main module instead of in an atexit hook. However, I'm doing this at a library so I don't have any way but atexit to ensure that this happens. Another option is to forego ThreadPoolExecutor entirely and manage the threads myself.

My code in question is in a not-yet-released branch of Tornado: https://github.com/tornadoweb/tornado/blob/5913aa43ecfdaa76876fc57867062227b907b1dd/tornado/platform/asyncio.py#L57-L73

With the master branch of Tornado, Python 3.9, and Windows, python -c "from tornado.httpclient import HTTPClient; c = HTTPClient() reliably deadlocks at interpreter shutdown. msg385589 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2021-01-24 19:30

I'm dealing with a subtle deadlock involving concurrent.futures.ThreadPoolExecutor, and my solution that worked in Python 3.8 broke with 3.9. I'm running some long-running (possibly infinite) tasks in the thread pool, and I cancel them in an atexit callback so that everything can shut down cleanly (before ThreadPoolExecutor joins all worker threads in its own atexit hook).

IMO, a better practice would be providing those potentially infinite running tasks a direct method of escape and invoking it before calling executor.shutdown(), it would be a more reliable approach. But, perhaps there is some convenience utility in being able to provide custom atexit hooks. It also might help the user to separate the shutdown logic from the rest of the program.

Since you worked with me in adding threading._register_atexit(), Do you have any thoughts, Antoine? I would personally not be opposed to it being made public assuming there's real utility present in doing so.

My only concern is that it might be a potential foot-gun. If the user submits an atexit hook that deadlocks, it might prevent threads from shutting down safely prior to interpreter finalization. I'm presently undecided if explicitly mentioning that it in the docs would be sufficient warning. msg385598 - (view) Author: Ben Darnell (Ben.Darnell) * Date: 2021-01-25 03:09

IMO, a better practice would be providing those potentially infinite running tasks a direct method of escape and invoking it before calling executor.shutdown(), it would be a more reliable approach.

Agreed, but the problem is that I'm in a library (so I don't control the main module), and the library's interface does not mandate any sort of explicit shutdown method. There is a shutdown method, but almost no one calls it, and it's never caused a problem until Python 3.9 changed things so it deadlocks.

My only concern is that it might be a potential foot-gun. If the user submits an atexit hook that deadlocks, it might prevent threads from shutting down safely prior to interpreter finalization.

Yes, and that is exactly the problem. concurrent.futures submits an atexit hook whose behavior depends on application code, and through that I have inadvertently caused a deadlock. msg385688 - (view) Author: Ben Darnell (Ben.Darnell) * Date: 2021-01-26 01:50 I have resolved my issue here by moving from ThreadPoolExecutor to a plain threading.Thread that I manage by hand (https://github.com/tornadoweb/tornado/commit/15832bc423c33c9280564770046dd6918f3a31b4). Therefore I no longer need this for myself and I leave it up to you to decide whether there's anything worth doing at this point. msg412438 - (view) Author: Simon Arlott (sa) Date: 2022-02-03 11:44 Another way to do this is to call threading.main_thread().join() in another thread and do the shutdown cleanup when it returns.

The main thread is stopped at shutdown just before the threading._threading_atexits are called. msg412470 - (view) Author: Eric Snow (eric.snow) * (Python committer) Date: 2022-02-03 21:19

I'm running some long-running (possibly infinite) tasks in the thread pool, and I cancel them in an atexit callback

To be clear, by "cancel" you are not talking about Future.cancel(). Rather, your handler causes all running tasks to finish (by sending a special message on the socket corresponding to each running task). Is that right?

Some other things I inferred from your atexit handler:

  • it does not make sure the task associated with the socket finishes (no way of knowing?)
  • so if a task hangs while trying to stop then the running thread in the ThreadPoolExecutor would block shutdown forever
  • similarly, if a task is stuck handling a request then it will never receive the special message on the socket, either blocking the send() in your handler or causing ThreadPoolExecutor shutdown/atexit to wait forever
  • it vaguely implies a 1-to-1 relationship between sockets and running tasks
  • likewise that pending (queued) tasks do not have an associated socket (until started)
  • so once your handler finishes, any tasks pending in the ThreadPoolExecutor queue will eventually get started but never get stopped by your handler; thus you're back to the deadlock situation

Does all that sound right? Most of that is relevant to some possible solutions I have in mind. msg412471 - (view) Author: Eric Snow (eric.snow) * (Python committer) Date: 2022-02-03 21:21

I'm running some long-running (possibly infinite) tasks in the thread pool, and I cancel them in an atexit callback

Alternately, perhaps ThreadPoolExecutor isn't the right fit here, as implied by the route you ended up going. It seems like it's not well-suited for long-running (or infinite) tasks. In that case, perhaps the concurrent.futures docs could be more clear about when ThreadPoolExecutor is not a good fit and what the alternatives are. msg412472 - (view) Author: Eric Snow (eric.snow) * (Python committer) Date: 2022-02-03 21:27 FWIW, here's a brain dump about ThreadPoolExecutor and its atexit handler after having looked at the code.


First, the relationship between the objects involved:

  • work item -> Future
  • work item -> task (e.g. function)
  • queue -> [work item]
  • worker -> executor
  • worker -> queue
  • worker -> currently running work item
  • thread -> worker
  • ThreadPoolExecutor -> [thread]
  • ThreadPoolExecutor -> queue
  • global state -> {thread: queue}

Observations:

  • a work item's future and task are not aware of each other, and operations on either have no effect on the other

Next, let's look at the relevant ways the objects can be used:

  • publicly
  • ThreadPoolExecutor.submit(task) -> Future
  • ThreadPoolExecutor.shutdown()
  • Future.result() and Future.exception()
  • Future.cancel()
  • Future.add_done_callback()
  • internally
  • queue.pop() -> work item
  • .run()
  • thread.join()
  • Future.set_running_or_notify_cancel()
  • Future.set_result() and Future.set_exception()

Observations:

  • nothing interacts with a worker directly; it is waited on via its thread and it receives work (or None) via the queue it was given
  • once a worker pops the next work item off the queue, nothing