動態規劃(Dynamic Programming, DP)

Photo by Nerfee Mirandilla on Unsplash
Photo by Nerfee Mirandilla on Unsplash
在強化學習(Reinforcement Learning, RL)中,動態規劃(Dynamic Programming, DP,)是最早且最完整的求解方法。雖然 DP 幾乎無法直接應用於實際的高維或連續環境,但它揭示了現代 RL 所有核心概念的數學基礎。所有演算法的收斂目標與更新規則本質上都源自於 DP 所使用的貝爾曼方程(Bellman Equations)與廣義策略迭代(Generalized Policy Iteration, GPI)框架。

在強化學習(Reinforcement Learning, RL)中,動態規劃(Dynamic Programming, DP,)是最早且最完整的求解方法。雖然 DP 幾乎無法直接應用於實際的高維或連續環境,但它揭示了現代 RL 所有核心概念的數學基礎。所有演算法的收斂目標與更新規則本質上都源自於 DP 所使用的貝爾曼方程(Bellman Equations)與廣義策略迭代(Generalized Policy Iteration, GPI)框架。

完整程式碼可以在 下載。

Reinforcement Learning 方法分類

在開始介紹 Dynamic Programming 演算法之前,我們有必要先介紹 Reinforcement Learning(RL)中的三種分類。這些分類是用來描述演算法本身的形式與性質。這三種性質代表著演算法適用情境,而不是代表它們之間的優劣。

Model-Based vs. Model-Free

Model-based 方法假設 agent 已知或能學得完整的環境 model。換言之,agent 能預測每個 action 的 reward 和 transition dynamics。

p(s^\prime, r \mid s, a)

Model-based 方法的特點是:

  • 能直接使用 Bellman equation 中期望值的確切形式。
  • 能進行規劃(planning),即不需要互動(interaction)就可以模擬未來。

Model-free 方法則是,agent 不需要知道 environment model,完全依賴 sample 進行學習。

Model-free 方法的特點是:

  • 無需知道 transition dynamics。
  • 更加接近真實的世界的情況。

Bootstrapping vs. Non-Bootstrapping

Bootstrapping 方法是指,當一個方法更新 value function 時,它使用了其他的估計值來進行更新,而不是使用完整的 return G_t。如下面的式子中,v_{\pi^\prime} 使用 v_\pi(s^\prime) 估計值來更新,因此是 bootstrapping。

\displaystyle v_{\pi^\prime}(s) \leftarrow \sum_a \pi(a \mid s) \sum_{s^\prime, r} p(s^\prime, r \mid s, a) \left[ r + \gamma v_\pi(s^\prime) \right]

Bootstrapping 方法的特點是:

  • 更新迅速。
  • 可能帶入偏差(bias)。因為使用了尚未正確的估計值,更新本身就會偏離真正的期望。

Non-bootstrapping 方法是指,更新 value function 時,是依靠完整的 return G_t,而不使用任何估計值。

v_{\pi^\prime} \leftarrow G_t

Non-bootstrapping 方法的特點是:

  • 無偏差(unbiased)。
  • 高變異度(high variance)。因為完整的 return 受到整段軌跡(trajectory)的隨機噪音影響非常大。

On-Policy vs. Off-Policy

在 RL 中,永遠有兩件事:

  1. 要學習哪個 policy。
    • 我們稱為目標策略(target policy),記作 \pi
    • 你希望得到它的 value functions v_\pi, q_\pi,或直接優化它的參數。
  2. 實際與 environment 互動、產生資料的那個 policy。
    • 我們稱為行為策略(behavior policy),記作 \mu
    • 它決定 agent 在每個 state 實際選哪個 action。

On-policy 方法是指 \pi = \mu,也就是說,要學的 policy 等於用來互動與取樣的 policy。

例如,我們現在用一個 \epsilon-\text{greedy} policy \pi_\epsilon 與 environment 互動,並且收集到互動時產生的樣本、return 等資料。這些都是在 \pi_\epsilon 下產生的資料。然後,我們用這些資料來更新 value functions v_{\pi_\epsilon}, q_{\pi_\epsilon}

Off-policy 方法是指 \pi \neq \mu。也就是說,要學的 policy 不等於用來互動與取樣的 policy。

例如,我們用一個 behavior policy \mu 與 environment 互動,並且收集互動時產生的樣本、return 等資料。然後,我們用這些資料來更新 value function v_\pi, q_\pi,而不是 v_\mu, q_\mu

動態規劃(Dynamic Programming, DP)

動態規劃(Dynamic Programming, DP)是 RL 中最早、最完整、最數學化的求解方式。

DP 是 model-based 方法。它完全依賴 model。它假設我們已經完全知道 environment 的 transition dynamics p(s^\prime, r \mid s, a)。這表示,我們知道從 state s 執行 action a 之後,所有可能的下一個 state s^\prime 以及對應的 reward r。換言之,我們知道狀態轉移的機率分佈。因此 DP 是典型的 planning 方法,它不需要從環境 sample,而是直接對 model 做推理。

DP 是 bootstrapping 方法。因為知道完整 model,因此 DP 可以對所有 states 做全幅更新(full sweep),並且利用 Bellman equation 進行收斂迭代。

DP 是 on-policy 方法。DP 中沒有採樣。它的 prediction 使用 \pi 本身,而它的 improvement 使用 greedy from v_\pi,皆是基於當前 policy \pi

預測(Prediction)

迭代策略評估(Iterative Policy Evaluation)

在 RL 中,我們稱策略評估(policy evaluation)為預測問題(Prediction problem)。Prediction 問題是,給定一個已知 policy \pi,求其 state-value function v_\pi(s)

迭代策略評估(Iterative Policy Evaluation)是 DP 的一個 prediction 方法。它使用 Bellman equation 作為計算的更新規則。在保證 v_\pi 存在的相同條件下,序列 \{ v_k \} 就會隨著 k \rightarrow \infty 收斂至 v_\pi

\displaystyle v_{k+1} \doteq \mathbb{E}_\pi \left[ R_{t+1} + \gamma v_k(S_{t+1}) \mid S_t=s \right] \\\\ \hphantom{v_{k+1}} = \sum_a \pi(a \mid s) \sum_{s^\prime, r} p(s^\prime, r \mid s, a) \left[ r + \gamma v_k(s^\prime) \right]

Iterative policy evaluation 的演算法如下:

Iterative Policy Evaluation (source: Reinforcement Learning: An Introduction, 2nd).
Iterative Policy Evaluation (source: Reinforcement Learning: An Introduction, 2nd).

控制(Control)

在 RL 中,控制問題(control problem)的目標是,透過反覆執行 policy evaluation 和 policy improvement,找到最佳 policy \pi_*,使得對所有 states 接最大化 return。

接下來,我們將介紹 DP 的兩個 control 方法。

策略迭代(Policy Iteration)

DP 的策略迭代(Policy Iteration)的想法是,一但 policy \pi 藉由 v_\pi 獲得了改善,而產生更好的 policy \pi^\prime,我們就可以再計算 v_{\pi^\prime},並再次獲得改善的 \pi^{\prime\prime}。因為一個 finite MDP 只有有限數量的策略,因此此過程最終會收斂至 optimal policy 和 optimal state-value function。

Policy iteration 的演算法如下:

Policy Iteration (source: Reinforcement Learning: An Introduction, 2nd).
Policy Iteration (source: Reinforcement Learning: An Introduction, 2nd).

價值迭代(Value Iteration)

Policy iteration 的一個缺點是,每次迭代過程都牽涉 policy evaluation,而它需要對個 states 進行多次的迭代計算,因此相當耗時。

價值迭代(Value Iteration)將 policy evaluation 和 policy improvement 合併成一個步驟,即使用 Bellman optimality equation 進行更新。對於任意的 v_0,在保證 v_* 存在的相同條件下,序列 \{ v_k \} 可以被證明會收斂至 v_*

\displaystyle v_{k+1} \doteq \max_a \mathbb{E} \left[ R_{t+1} + \gamma v_k(S_{t+1}) \mid S_t=s \right] \\\\ \hphantom{v_{k+1}} = \max_a \sum_{s^\prime, r} p(s^\prime, r \mid s, a) \left[ r + \gamma v_k(s^\prime) \right]

v_k 足夠收斂後,optimal policy \pi_* 為:

\displaystyle \pi_*(s) = \text{argmax}_a \sum_{s^\prime, r} p(s^\prime, r \mid s, a) \left[ r + \gamma v_k(s^\prime) \right]

Value policy 的演算法如下:

Value Iteration (source: Reinforcement Learning: An Introduction, 2nd).
Value Iteration (source: Reinforcement Learning: An Introduction, 2nd).

效率與限制

DP 直接使用 Bellman equation,完全無需取樣,也無任何估計誤差。所以其收斂過程穩定且可預期。然而,DP 的限制非常嚴重,使其在實際 RL 應用中,幾乎無法使用。其主要原因是:

  1. 它需要完整 model。
  2. 它需要對所有 state 進行 full sweep,因此計算量龐大。
  3. 它無法對應連續或高維度 state space。

雖然 DP 無法應用於大型問題,但它仍然是 RL 中不可或缺的部分,因為它提供了其他所有方法的概念基礎。DP 形式的 Bellman equation 是其他所有方法的基礎。

範例

實作

在以下的程式碼中,我們忠實地實作 DP 的兩個 control 方法,policy iteration 和 value iteration。

程式碼中的 envGymnasiumEnv 物件。Gymnasium 的前身是 OpenAI Gym。它提供了眾多的 RL 環境的數據,讓我們可以在模擬的環境中,測試 RL 演算法。

Environment 中有 env.observation_space.n 個 states。因此,state 的編號就從整數 0 到 env.observation_space.n - 1。變數 V 是一個浮點數的 1D-array,每一個 element 都是一個 state 的 state-value 的估計值。例如,V[0] 是 state 0 的 state-value 估計值。

每個 state 有 env.action_space.n 個 actions。因此,action 的編號就從整數 0 到 env.action_space.n - 1。Policy pi 是一個 [env.observation_space.n][env.action_space.n] 的浮點數 2D-array。每個 pi[s][a] 都是一個 \left[ 0,1 \right] 的機率。每個 pi[s] 下所有動作的機率和為 1。

Environment 的 env.unwrapped.P 是 transition dynamics。每一個 env.unwrapped.P[s][a] 包含著其下一個 state、reward、transition probability、以下一個 state 是否是 terminal state。

class DynamicProgramming:
    def __init__(self, env: gym.Env, gamma=0.9, theta=1e-9):
        self.env = env
        self.gamma = gamma
        self.theta = theta

    def _policy_evaluation(self, pi: np.ndarray, V: np.ndarray) -> np.ndarray:
        while True:
            delta = 0
            for s in range(self.env.observation_space.n):
                v = 0
                for a, pi_as in enumerate(pi[s]):
                    q_sa = 0
                    for p, s_prime, r, terminated in self.env.unwrapped.P[s][a]:
                        q_sa += p * (r + self.gamma * V[s_prime] * (not terminated))
                    v += pi_as * q_sa
                delta = max(delta, np.abs(v - V[s]))
                V[s] = v

            if delta < self.theta:
                break

        return V

    def _policy_improvement(self, pi: np.ndarray, V: np.ndarray) -> tuple[np.ndarray, bool]:
        policy_stable = True

        for s in range(self.env.observation_space.n):
            old_a = np.argmax(pi[s])
            pi[s] = self._greedify_policy(V, s)
            new_a = np.argmax(pi[s])

            if old_a != new_a:
                policy_stable = False

        return pi, policy_stable

    def _greedify_policy(self, V: np.ndarray, s: int) -> np.ndarray:
        q_s = np.zeros(self.env.action_space.n)
        for a in range(self.env.action_space.n):
            for p, s_prime, r, terminated in self.env.unwrapped.P[s][a]:
                q_s[a] += p * (r + self.gamma * V[s_prime] * (not terminated))

        new_a = np.argmax(q_s)
        pi_s = np.eye(self.env.action_space.n)[new_a]
        return pi_s

    def policy_iteration(self) -> tuple[np.ndarray, np.ndarray]:
        V = np.zeros(self.env.observation_space.n)
        pi = np.ones((self.env.observation_space.n, self.env.action_space.n)) / self.env.action_space.n
        policy_stable = False

        while not policy_stable:
            V = self._policy_evaluation(pi, V)
            pi, policy_stable = self._policy_improvement(pi, V)

        return pi, V

    def value_iteration(self) -> tuple[np.ndarray, np.ndarray]:
        V = np.zeros(self.env.observation_space.n)

        while True:
            delta = 0
            for s in range(self.env.observation_space.n):
                q_s = np.zeros(self.env.action_space.n)
                for a in range(self.env.action_space.n):
                    for p, s_prime, r, terminated in self.env.unwrapped.P[s][a]:
                        q_s[a] += p * (r + self.gamma * V[s_prime] * (not terminated))

                new_a = np.max(q_s)
                delta = max(delta, np.abs(new_a - V[s]))
                V[s] = new_a

            if delta < self.theta:
                break

        pi = np.zeros((self.env.observation_space.n, self.env.action_space.n))
        for s in range(self.env.observation_space.n):
            pi[s] = self._greedify_policy(V, s)

        return pi, V

Cliff Walking

Cliff Walking 涉及在一個網格世界(grid world)中,從起始位置移動到目標位置,同時避免掉落懸崖,如下圖。

Cliff Walking.
Cliff Walking.

Cliff Walking 問題中每個 state 就是玩家的目前的 position。因此共有 36 個 states,分別是上面三排加上左下的格子,3 * 12 + 1 = 36。 目前的 position 由以下公式算出:

\text{position} = \text{current-row} \times \text{num-col} + \text{current-col}

有四個 actions,分別是:

  • 0:Move up
  • 1:Move right
  • 2:Move down
  • 3:Move left

每一步皆給予 −1 reward。但若玩家踏入懸崖(cliff),則會受到 −100 reward。

import time

import gymnasium as gym
import numpy as np

from dp import DynamicProgramming

GYM_ID = "CliffWalking-v1"


def play_game(policy, episodes=1):
    visual_env = gym.make(GYM_ID, render_mode="human")

    for episode in range(episodes):
        state, _ = visual_env.reset()
        terminated = False
        truncated = False
        total_reward = 0
        step_count = 0

        print(f"Episode {episode + 1} starts")
        while not terminated and not truncated:
            action = np.argmax(policy[state])
            state, reward, terminated, truncated, _ = visual_env.step(action)
            total_reward += reward
            step_count += 1
            time.sleep(0.3)

        print(f"Episode {episode + 1} is finished: Total reward is {total_reward}, steps = {step_count}")
        time.sleep(1)

    visual_env.close()


if __name__ == "__main__":
    env = gym.make(GYM_ID)
    dp = DynamicProgramming(env)

    print(f"Gym environment: {GYM_ID}")

    print("Start DP Policy Iteration")
    pi_policy, pi_V = dp.policy_iteration()
    play_game(pi_policy)

    print("\n")

    print("Start DP Value Iteration")
    vi_policy, vi_V = dp.value_iteration()
    play_game(vi_policy)

    env.close()

以下為上述程式的執行結果。可以觀察到,agent 在學習完成後,會選擇沿著懸崖邊緣行走,並以最短路徑抵達終點,反映其在回報最大化目標下所學得的最優策略。

Cliff Walking by Dynamic Programming.
Cliff Walking by Dynamic Programming.

Taxi

Taxi 問題是在一個 grid world 中移動,前往乘客所在的位置接送乘客,並將乘客送達四個指定地點之一,如下圖。

Taxi problem.
Taxi problem.

Taxi 問題中有 500 個 discrete states。每個 state 由以下公式算出:

r: \text{the row the taxi is currently at} \\\\ c: \text{the colunm the taxi is currently at} \\\\ p: \text{the location the passenger is at} \\\\ d: \text{the destination to drop off the passenger} \\\\ state = ((r \times 5 + c) \times 5 + p) * 4 + d

Passenger 的 locations:

  • 0:Red
  • 1:Green
  • 2:Yellow
  • 3:Blue
  • 4:In taxi

Destinations:

  • 0:Red
  • 1:Green
  • 2:Yellow
  • 3:Blue

有六個 actions,分別是:

  • 0:Move south(down)
  • 1:Move north(up)
  • 2:Move east(right)
  • 3:Move west(left)
  • 4:Pickup passenger
  • 5:Drop off passenger

每一步皆給予 −1 reward,除非觸發其他 reward。成功將乘客送達目的地可獲得 +20 reward。非法執行 pickup 或 drop-off 動作時,會受到 −10 reward。

import time

import gymnasium as gym
import numpy as np

from dp import DynamicProgramming

GYM_ID = "Taxi-v3"


def play_game(policy, episodes=1):
    visual_env = gym.make(GYM_ID, render_mode="human")

    for episode in range(episodes):
        state, _ = visual_env.reset()
        terminated = False
        truncated = False
        total_reward = 0
        step_count = 0

        print(f"Episode {episode + 1} starts")
        while not terminated and not truncated:
            action = np.argmax(policy[state])
            state, reward, terminated, truncated, _ = visual_env.step(action)
            total_reward += reward
            step_count += 1
            time.sleep(0.3)

        print(f"Episode {episode + 1} is finished: Total reward is {total_reward}, steps = {step_count}")
        time.sleep(1)

    visual_env.close()


if __name__ == "__main__":
    env = gym.make(GYM_ID)
    dp = DynamicProgramming(env)

    print(f"Gym environment: {GYM_ID}")

    print("Start DP Policy Iteration")
    pi_policy, pi_V = dp.policy_iteration()
    play_game(pi_policy)

    print("\n")

    print("Start DP Value Iteration")
    vi_policy, vi_V = dp.value_iteration()
    play_game(vi_policy)

    env.close()

以下是上面程式的執行結果。

Taxi by Dynamic Programming.
Taxi by Dynamic Programming.

結語

DP 透過明確的 Bellman equation、完整的 model 資訊與系統性的 sweeps,提供了最純粹、最數學化的 policy evaluation 與 policy improvement 方式。儘管 DP 需要完整的環境 model 並且必須遍歷整個 state space,使其無法直接應用於複雜的真實環境,但 DP 的價值並不在於實作,而在於它所奠定的概念基礎。因此,儘管 DP 本身不適合用於大型問題,它仍是理解所有後續 RL 演算法的基礎。

參考

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

You May Also Like