在 Python 中,Process、Thread 與 Async 是三種不同的並發(concurrent)機制,但它們的行為與適用情境經常被混淆。特別是在使用 FastAPI 時,若沒有理解其底層執行方式,容易寫出會阻塞(blocking)系統的程式。本文從這些基礎概念出發,整理 Python 在並發與實務應用上的行為。
Table of Contents
行程(Process)
行程(Process)是 OS 層級的執行單位。當一個程式啟動時,系統會建立一個 process,並分配獨立的記憶體空間與執行環境。每個 process 內包含一個 Python interpreter 與至少一個 thread(main thread),由 OS 負責排程。
Process 的關鍵特性是記憶體隔離。不同 process 之間無法直接共享資料,因此在執行上彼此獨立。這樣的設計提升了穩定性,但也意味著資料交換需要透過明確的機制,例如 queue 或 pipe,而不能直接存取變數。
在 Python 中,可以透過 multiprocessing 模組建立新的 process。以下程式會建立一個新的 process,並在其中執行 worker()。主程式與子 process 會有不同的 PID,表示它們是獨立的執行單位。
from multiprocessing import Process
import os
def worker():
print(f"Worker PID: {os.getpid()}")
if __name__ == "__main__":
print(f"Main PID: {os.getpid()}")
p = Process(target=worker)
p.start()
p.join()
# Output
Main PID: 1633
Worker PID: 1641由於每個 process 都有自己的 Python interpreter,因此也各自擁有一個 GIL。這代表多個 process 可以同時在不同 CPU 核心上執行 Python 程式,而不會互相影響。這個特性使得 process 特別適合 CPU-bound 任務。在多核心環境下,這些 process 可以同時執行,達到真正的平行運算。
from multiprocessing import Process
def compute(seq):
for i in range(10**7):
print(f"Process {seq} is running, iteration {i}")
if __name__ == "__main__":
processes = []
for seq in range(4):
p = Process(target=compute, args=(seq,))
processes.append(p)
p.start()
for p in processes:
p.join()
# Output
Process 3 is running, iteration 308566Process 2 is running, iteration 307463
Process 2 is running, iteration 307464
Process 3 is running, iteration 308567
Process 2 is running, iteration 307465Process 1 is running, iteration 309200
Process 2 is running, iteration 307466Process 1 is running, iteration 309201
Process 1 is running, iteration 309202Process 0 is running, iteration 306115
Process 3 is running, iteration 308568
Process 0 is running, iteration 306116Process 3 is running, iteration 308569
...當需要在 process 之間傳遞資料時,必須使用 IPC(inter-process communication)機制,例如 queue。這種方式雖然安全,但會增加資料傳輸與序列化的成本。
from multiprocessing import Process, Queue
def worker(q):
q.put("hello")
if __name__ == "__main__":
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
print(q.get())
p.join()
# Output
hello整體而言,Process 適合用於彼此獨立、計算密集的任務,特別是需要利用多核心 CPU 的情境。但由於建立成本較高且資料交換不方便,通常會用在較粗粒度的工作,而不是頻繁且細小的操作。
執行緒(Thread)
執行緒(Thread)是在 process 內部運行的執行單位。同一個 process 中的 threads 共享記憶體空間與資源,因此可以直接存取相同的資料結構與變數。這讓 thread 在資料交換上非常直接,但也意味著需要自行處理同步問題,例如避免多個 thread 同時修改同一個資料。
在 Python 中,每個 process 預設會有一個 main thread,額外的 thread 可以透過 threading.Thread() 建立。這些 thread 由 OS 負責排程,因此在系統層面上,它們可以被分配到不同 CPU 核心。
在以下的例子中,兩個 thread 會並發執行,並在等待 sleep() 時讓出執行權,使其他 thread 有機會運行。
import threading
import time
def worker(name):
print(f"{name} start")
time.sleep(2)
print(f"{name} end")
t1 = threading.Thread(target=worker, args=("A",))
t2 = threading.Thread(target=worker, args=("B",))
t1.start()
t2.start()
t1.join()
t2.join()
# Output
A start
B start
B end
A end在 CPython 中,thread 的行為會受到 GIL(Global Interpreter Lock)的影響。GIL 是 Python interpreter 的一個全域鎖,用來確保同一時間只有一個 thread 可以執行 Python bytecode。這個限制不是來自 CPU,而是來自 Python interpreter 的設計。
這代表在純 Python 的計算任務中,多個 thread 無法同時執行,而是輪流取得 GIL 後執行。即使在多核心 CPU 上,這些 thread 仍然無法達到真正的平行運算效果。
以下的程式中,即使建立多個 thread,整體執行時間通常不會顯著縮短,因為計算仍然受到 GIL 的限制。
import threading
def compute(seq):
for i in range(10**7):
print(f"Thread {seq} is running, iteration {i}")
threads = []
for seq in range(2):
t = threading.Thread(target=compute, args=(seq,))
threads.append(t)
t.start()
for t in threads:
t.join()
# Output
Thread 0 is running, iteration 590839
Thread 0 is running, iteration 590840
Thread 0 is running, iteration 590841
Thread 0 is running, iteration 590842
Thread 0 is running, iteration 590843
Thread 0 is running, iteration 590844
Thread 0 is running, iteration 590845
Thread 0 is running, iteration 590846
Thread 0 is running, iteration 590847
Thread 0 is running, iteration 590848
Thread 1 is running, iteration 618463
Thread 1 is running, iteration 618464
Thread 0 is running, iteration 590849
Thread 0 is running, iteration 590850
Thread 0 is running, iteration 590851
Thread 0 is running, iteration 590852
Thread 0 is running, iteration 590853
Thread 0 is running, iteration 590854
Thread 0 is running, iteration 590855
Thread 0 is running, iteration 590856
Thread 0 is running, iteration 590857Thread 1 is running, iteration 618465
Thread 1 is running, iteration 618466
Thread 1 is running, iteration 618467然而,GIL 並不會完全阻止並發。在進行 I/O 操作時,例如檔案讀寫或網路請求,Python 會釋放 GIL,使其他 thread 能夠繼續執行。因此,在 I/O-bound 的情境中,多個 thread 可以在等待 I/O 的期間交錯執行,達到良好的並發效果。
由於 threads 共享記憶體,當多個 thread 同時修改資料時,必須使用同步機制來確保結果正確。最常見的方式是使用 lock。如果沒有 lock,以下程式的結果可能會不正確,因為多個 thread 會同時修改同一個變數。
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock:
counter += 1
threads = []
for _ in range(2):
t = threading.Thread(target=increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(counter)
# Output
200000整體而言,Thread 適合用於需要共享資料且以 I/O 為主的任務。它的優點是建立成本低、資料交換直接,但在 CPU-bound 任務中無法提供真正的平行效能,並且需要額外處理同步問題。GIL 是理解 thread 行為的關鍵因素,因為它直接影響 thread 在不同類型任務中的表現。
非同步(Async)
非同步(Async)是一種以事件驅動為核心的並發模型,其設計目標是在單一 thread 中有效地處理大量任務。與 thread 依賴 OS 進行排程不同,async 的任務切換是由程式本身主動控制,這種方式通常稱為 cooperative multitasking。
在 Python 中,async def 定義的是 coroutine。呼叫 coroutine 並不會立即執行,而只是建立一個 coroutine object。只有在使用 await 或將其註冊到 event loop 後,該 coroutine 才會開始執行。
以下例子顯示,直接呼叫 coroutine 並不會有任何效果,必須透過 await 才會真正執行。
import asyncio
async def run():
print("running")
async def main():
run() # not execute
await run() # execute
asyncio.run(main())Event loop 是 async 的核心元件,可以視為一個任務排程器。當 coroutine 執行到 await 時,代表它主動讓出控制權,event loop 會暫停該任務,並切換去執行其他已準備好的任務。當原本等待的操作完成後,event loop 會再回來繼續執行該 coroutine。
在這個例子中,兩個任務會在單一 thread 中交錯執行,達到並發效果。
import asyncio
async def worker(name):
print(f"{name} start")
await asyncio.sleep(2)
print(f"{name} end")
async def main():
await asyncio.gather(
worker("A"),
worker("B")
)
asyncio.run(main())
# Output
A start
B start
A end
B end除了 await,另一個常見的操作是 asyncio.create_task()。它會將 coroutine 包裝成 task,並註冊到 event loop 中執行,但不會等待其結果。
在這個例子中,worker() 會被排程執行,但 main() 不會等待它完成。這種方式適合用於需要在背景執行的任務,但若沒有適當管理,可能導致任務遺失或錯誤無法被觀察。
import asyncio
async def worker():
print("start")
await asyncio.sleep(2)
print("end")
async def main():
asyncio.create_task(worker())
print("main done")
asyncio.run(main())Async 的並發能力來自於任務在等待 I/O 時讓出控制權,因此特別適合 I/O-bound 的場景,例如大量 HTTP 請求或資料庫查詢。然而,這也代表 async 對 blocking operation 非常敏感。如果在 coroutine 中執行同步且耗時的操作,整個 event loop 會被阻塞,導致所有任務停滯。
這樣的程式會阻塞整個 event loop,失去 async 的優勢。
async def bad():
for _ in range(10**8):
pass # blocking CPU work由於 async 是單一執行緒模型,因此它並不提供平行運算能力。所有任務仍然在同一個 thread 中執行,只是透過切換來達到並發效果。若需要真正的平行運算,仍然需要使用 process 或其他機制。
整體而言,Async 適合用於高併發且以 I/O 為主的系統,例如 Web server 或 API service。它的優點是能以較低的資源成本處理大量任務,但前提是所有操作都必須是非阻塞的,並且開發者需要明確掌握任務的生命週期與執行時機。
在 Thread 中建立 Async 執行環境
在某些情境下,會希望在新的 thread 中執行 async 程式。這通常不是因為 async 本身不夠用,而是系統中已經存在 thread 架構,或希望將某一段邏輯隔離到不同的執行環境中。在這種情況下,thread 與 async 並不是互斥的,而是可以組合使用。
要讓這件事情成立,有一個前提需要先理解,event loop 是綁定在 thread 上的。換句話說,每一個 thread 若要執行 coroutine,都必須擁有自己的 event loop。這也是為什麼在新的 thread 中,不能直接使用既有的 event loop,而是需要建立一個新的執行環境。
最直接的做法,是在 thread 中呼叫 asyncio.run()。這個函式會自動建立 event loop、執行 coroutine,並在結束後關閉 loop,因此適合用於單次執行的情境。
在以下的例子中,新的 thread 會自行建立一個 event loop,並在其中執行 async_worker()。整個 async 的生命週期完全被限制在該 thread 中,彼此之間不會互相干擾。
import threading
import asyncio
async def async_worker():
print("async start")
await asyncio.sleep(1)
print("async end")
def thread_worker():
asyncio.run(async_worker())
t = threading.Thread(target=thread_worker)
t.start()
t.join()如果需要更細緻地控制 event loop,例如在同一個 thread 中重複使用,則可以手動建立 loop。這時需要先建立一個新的 event loop,並將其設定為當前 thread 的預設 loop,之後再執行 coroutine。
import threading
import asyncio
async def async_worker():
print("async start")
await asyncio.sleep(1)
print("async end")
def thread_worker():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(async_worker())
loop.close()
t = threading.Thread(target=thread_worker)
t.start()
t.join()這樣的寫法讓 event loop 的建立與銷毀更加明確,也方便在同一個 thread 中安排多個 async 任務。
需要特別注意的是,event loop 不能在不同 thread 之間共用。event loop 的設計假設它只會在建立它的 thread 中運行,因此若將同一個 loop 傳遞到其他 thread 使用,通常會導致不可預期的錯誤。實務上應該將一個 thread 對應一個 event loop 視為基本原則。
將 thread 與 async 結合使用時,可以把 thread 視為一個額外的執行容器,而 async 則負責在這個容器中管理多個任務。這種組合在需要隔離執行環境,同時又希望利用 async 處理 I/O 併發時特別有用。
阻塞操作(Blocking Operation)的正確處理方式
在 concurrency 中,一個關鍵問題是如何處理阻塞操作(blocking operation)。所謂 blocking operation,是指會長時間佔用 threads,且在此期間無法讓出控制權的操作,例如檔案讀寫、網路請求、資料庫查詢或任何同步的長時間運算。
在 thread 模型中,blocking operation 是可以接受的。當一個 thread 因為 I/O 而阻塞時,OS 可以切換到其他 thread 繼續執行,因此整體系統仍然可以維持 concurrency。這也是為什麼 thread 適合用於 I/O-bound 任務。
在這個例子中,檔案讀取是 blocking operation,但它只會阻塞該 thread,而不會影響其他 threads。
import threading
def read_file():
with open("data.txt", "r") as f:
return f.read()
t = threading.Thread(target=read_file)
t.start()
t.join()在 async 模型中,情況則完全不同。由於 async 通常在單一 thread 中運行,所有任務都由同一個 event loop 管理。如果在 async function 中執行 blocking operation,整個 event loop 會被阻塞,導致其他所有任務無法執行。
這段程式會阻塞整個 event loop,因此在 async 環境中應避免這種寫法。
async def read_file():
with open("data.txt", "r") as f:
return f.read() # blocking operation當必須使用既有的 blocking function 時,一個常見的做法是將其轉移到 thread pool 中執行。這樣可以讓 event loop 保持非阻塞,同時仍然可以使用原本的同步程式碼。
在這個例子中,blocking_read() 會在 thread pool 中執行,而不是阻塞 event loop。
import asyncio
def blocking_read():
with open("data.txt", "r") as f:
return f.read()
async def main():
loop = asyncio.get_running_loop()
data = await loop.run_in_executor(None, blocking_read)
print(data)
asyncio.run(main())FastAPI 與同步 Endpoint
在 FastAPI 中,當 endpoint 使用一般的 def 定義時,該函式不會直接在 event loop 中執行。相反地,FastAPI(實際上是透過 Starlette)會將這個 sync 函式交給 thread pool 執行,以避免阻塞主要的 event loop。整個 request lifecycle 仍然由 Uvicorn 驅動,但實際的業務邏輯會在另一個 thread 中完成。
這代表每一個同步 endpoint 呼叫,實際上會佔用 thread pool 中的一個 worker thread。若該函式包含 blocking operation,例如檔案讀寫或網路請求,這些操作只會阻塞該 thread,而不會影響其他 request 的處理。
在這個例子中,time.sleep() 是 blocking operation,但因為整個函式是在 thread pool 中執行,因此不會阻塞 event loop。然而,這並不代表可以在這裡執行長時間任務。
from fastapi import FastAPI
import time
app = FastAPI()
@app.post("/job")
def create_job():
time.sleep(5) # blocking operation
return {"status": "done"}如果一個 request 需要執行數分鐘的工作,這個 thread 將會長時間被佔用。當同時有多個 request 時,thread pool 很快會被耗盡,導致新的 request 無法被處理,進而降低整體吞吐量。此外,HTTP request 本身通常會有 timeout 限制,長任務很可能在完成之前就被中斷。
因此,即使在同步 endpoint 中,長任務也不應該直接執行。較合理的做法是讓 endpoint 僅負責建立 job,並將其放入 queue 中,由其他 worker process 負責實際執行。這樣的設計可以避免 thread 被長時間佔用,同時讓 API 能夠快速回應。
from fastapi import FastAPI
from uuid import uuid4
app = FastAPI()
queue = []
@app.post("/job")
def create_job():
job_id = str(uuid4())
queue.append(job_id)
return {"job_id": job_id}FastAPI 與非同步 Endpoint
當 endpoint 使用 async def 定義時,函式會直接在 event loop 中執行,而不會經過 thread pool。這意味著該函式必須是非阻塞的,否則會影響整個系統的並發能力。
在 async endpoint 中,所有任務共享同一個 event loop。當一個 coroutine 遇到 await 時,會讓出控制權,讓其他任務繼續執行。但如果在這個過程中出現 blocking operation,整個 event loop 會被阻塞,導致所有 request 停滯。
這段程式在語法上是 async,但實際上仍然是同步阻塞的。當 time.sleep() 執行時,整個 event loop 無法處理其他 request,這會嚴重影響系統效能。
from fastapi import FastAPI
import time
app = FastAPI()
@app.post("/job")
async def create_job():
time.sleep(5) # blocking operation,block event loop
return {"status": "done"}即使使用 asyncio.create_task() 將任務放到背景執行,也不適合用來處理長時間且重要的工作。這種方式的問題在於,task 的生命週期仍然依附於 FastAPI process。一旦服務重啟,這些 task 將會直接消失,且沒有任何狀態記錄或重試機制。
import asyncio
from fastapi import FastAPI
app = FastAPI()
async def long_task():
await asyncio.sleep(10)
@app.post("/job")
async def create_job():
asyncio.create_task(long_task())
return {"status": "started"}因此,在 async endpoint 中,處理長任務的原則與同步 endpoint 相同,不應直接執行,而應將任務交由其他 worker process 處理。這樣的設計可以確保 event loop 保持輕量與非阻塞。
常用 API
這裡整理實務中常用的 API。
Process(multiprocessing)
用於建立獨立的執行單位,適合 CPU-bound 任務,並可避開 GIL。
multiprocessing.Process():建立新的 process。Process.start():啟動 process。Process.join():等待 process 執行完成。
Thread(threading)
用於 I/O-bound 任務,適合需要共享記憶體的情境。
threading.Thread():建立新的 thread。Thread.start():啟動 thread。Thread.join():等待 thread 執行完成。threading.Lock():控制多個 thread 存取共享資料。
Async(asyncio)
用於高併發 I/O,透過 event loop 管理任務。
asyncio.run():建立並執行 event loop,此作為 async 程式的入口。asyncio.create_task():將 coroutine 註冊為 task,並由 event loop 排程執行,但不等待結果。asyncio.gather():同時等待多個 coroutine 完成。asyncio.get_running_loop():取得當前 event loop。loop.run_in_executor():將 blocking function 移至 thread pool 執行,以避免阻塞 event loop。
Thread + Async
用於在 multi-thread 環境中執行 async 任務。
asyncio.run():在該 thread 建立並執行 event loop。asyncio.new_event_loop():建立新的 event loop(不自動綁定)。asyncio.set_event_loop():將 event loop 綁定到當前 thread。
結語
Process、Thread 與 Async 分別對應不同層級的並發機制,各自適用於不同的問題。GIL 並不是單純的效能限制,而是影響 thread 行為的重要因素。在使用 FastAPI 時,應該理解其 event loop 的運作方式,並避免在 request lifecycle 中執行長時間任務。透過將任務交由 worker 處理,並適當控制並發數量,可以建立一個穩定且可擴展的系統架構。



