Practical session 2: BanditsΒΆ

PreliminariesΒΆ

Let's start by loading all the required libraries for this notebook.

InΒ [19]:
import time
import numpy as np
from random import choice
from typing import Sequence, Tuple
import matplotlib.pyplot as plt
%matplotlib inline
np.random.seed(64) # Fixing the seed for reproducibility
import warnings
warnings.filterwarnings('ignore')

SetupΒΆ

In the context of clinical trials, Phase I trials are the first stage of testing in human subjects. Their goal is to evaluate the safety (and feasibility) of the treatment and identify its side effects. The aim of a phase I dose-finding study is to determine the most appropriate dose level that should be used in further phases of the clinical trials. Traditionally, the focus is on determining the highest dose with acceptable toxicity called the Maximum Tolerated Dose (MTD).

A dose-finding study involves a number K of dose levels that have been chosen by physicians based on preliminary experiments (K is usually a number between 3 and 10). Denoting by $p_k$ the (unknown) toxicity probability of dose $k$, the Maximum Tolerated Dose (MTD) is defined as the dose with a toxicity probability closest to a target:

\begin{align} k^* \in \underset{k\in\{1,\dots,K\}}{\mathrm{argmin}}|\theta - p_k| \end{align}

where $\theta$ is the pre-specified targeted toxicity probability (typically between 0.2 and 0.35). A MTD identification algorithm proceeds sequentially: at round $t$ a dose $D_t \in \{1, \dots , K\}$ is selected and administered to a patient for whom a toxicity response is observed. A binary outcome $X_t$ is revealed where $X_t = 1$ indicates that a harmful side-effect occurred and $X_t = 0$ indicates that no harmful side-effect occurred. We assume that $X_t$ is drawn from a Bernoulli distribution with mean $p_{D_t}$ and is independent from previous observations.

Hint: In this example, the reward definition is a bit different from the usual case. We would like to take the arm with minimum $|\theta - \hat{p}_k|$ where $\hat{p}_k$ is the estimated toxicity probability.

The MultiArmedBandit environmentΒΆ

The MultiArmedBandit object takes as parameters:

  1. the number of arms
  2. the function from which to sample the mean reward of each arm
  3. the function to sample the observed rewards for each arm given its mean reward

Task 1: Define your Bandit class:ΒΆ

Most of the class has been written. Complete the pull method in such a way that:

1. Update both num_dose_selected and num_toxic arrays,

2. Compute and return the reward $-|\theta - \hat{p}_k|$ where $\hat{p}_k$ is the estimated toxicity probability of arm $k$.

InΒ [20]:
class Bandit(object):

  def __init__(self,
               n_arm: int = 2,
               n_pulls: int = 2000,
               actual_toxicity_prob: list = [0.4, 0.6],
               theta: float = 0.3,
               ):
    self.n_arm = n_arm
    self.n_pulls = n_pulls
    self.actual_toxicity_prob = actual_toxicity_prob
    self.theta = theta
    self.init_bandit()

  def init_bandit(self):
    """
      Pessimistic initializaiton
    """
    self.num_dose_selected = np.array([0]*self.n_arm) # number of times a dose is selected
    self.num_toxic = np.array([0]*self.n_arm) # number of times a does found to be toxic

  def pull(self, a_idx: int):
    """
    .inputs:
      a_idx: Index of action.
    .outputs:
      rew: reward value.
    """
    assert a_idx < self.n_arm, "invalid action index"
    # ----------------------------------------------
    ...
    # ----------------------------------------------
    return rew
InΒ [Β ]:
### Solution:
class Bandit(object):
    def __init__(self,
                 n_arm: int = 2,
                 n_pulls: int = 2000,
                 actual_toxicity_prob: list = [0.4, 0.6],
                 theta: float = 0.3,
                 ):
        self.n_arm = n_arm
        self.n_pulls = n_pulls
        self.actual_toxicity_prob = actual_toxicity_prob
        self.theta = theta
        self.init_bandit()

    def init_bandit(self):
        """
        Initialize the bandit
        """
        self.num_dose_selected = np.array([0] * self.n_arm)  # number of times a dose is selected
        self.num_toxic = np.array([0] * self.n_arm)  # number of times a dose is found to be toxic 

    def pull(self, a_idx: int):
        """
        .inputs:
          a_idx: Index of action.
        .outputs:
          rew: reward value.
        """
        assert a_idx < self.n_arm, "invalid action index"
        
        ### Sampling the reward for actions
        toxicity_outcome = np.random.binomial(1, self.actual_toxicity_prob[a_idx]) == 1
        
        ### Update the game
        self.num_dose_selected[a_idx] += 1 
        if toxicity_outcome:
            self.num_toxic[a_idx] += 1 ## Collecting reward if the action was successful
        
        ### Estimate the action value
        estimated_toxicity_prob = np.where(
            self.num_dose_selected > 0,
            self.num_toxic / self.num_dose_selected,
            0  # If a dose is never selected no toxicity observed
        )
        
        # Compute reward 
        reward = -abs(self.theta - estimated_toxicity_prob[a_idx])
        return reward
    

# Test
bandit = Bandit()
reward = bandit.pull(1)
print(f"Reward: {reward}")
Reward: -0.7
InΒ [22]:
# Setup 3 dose levels
bandit = Bandit(n_arm=3, actual_toxicity_prob=[0.2, 0.35, 0.5], theta=0.3)

# Third dose
reward = bandit.pull(1)
print(f"Reward: {reward}")
Reward: -0.3

Fixing the setting for the rest of the practical sessionΒΆ

Let's define a dose finding study with three doses ($K = 3$) where you need to choose from with actual_toxicity_prob=[0.1, 0.35, 0.8] and targeted toxicity probability is $\theta = 0.3$.

InΒ [23]:
###Problem definition
bandit = Bandit(n_arm=3, n_pulls=2000, actual_toxicity_prob=[0.1, 0.35, 0.8], theta=0.3)

Q1.b: $\epsilon$-greedy for k-armed bandit and Optimistic initial valuesΒΆ

Q1.b1: $\epsilon$-greedy algorithm implementationΒΆ

Implement the $\epsilon$-greedy method.

InΒ [24]:
def eps_greedy(
    bandit: Bandit,
    eps: float,
    init_q: float = .0
    ) -> Tuple[list, list, list]:
  """
  .inputs:
    bandit: A bandit problem, instantiated from the above class.
    eps: The epsilon value.
    init_q: Initial estimation of each arm's value.
  .outputs:
    rew_record: The record of rewards at each timestep.
    avg_ret_record: The average of rewards up to step t, where t goes from 0 to n_pulls. For example: If
    we define `ret_T` = \sum^T_{t=0}{r_t}, `avg_ret_record` = ret_T / (1+T).
    tot_reg_record: The  regret up to step t, where t goes from 0 to n_pulls.
    opt_action_perc_record: Percentage of optimal arm selected.
  """
  # initialize q values
  q = np.array([init_q]*bandit.n_arm, dtype=float)

  ret = .0
  rew_record = []
  avg_ret_record = []
  tot_reg_record = []
  opt_action_perc_record = []

  for t in range(bandit.n_pulls):
    # ----------------------------------------------
    ...
    # ----------------------------------------------

  return rew_record, avg_ret_record, tot_reg_record, opt_action_perc_record
InΒ [25]:
### Solution
def eps_greedy(bandit, eps):
    """
    Perform epsilon-greedy strategy on the given bandit.
    .inputs:
      bandit: Bandit object
      eps: Epsilon value for exploration
    .outputs:
      rew_rec: list of rewards at each step
      avg_ret_rec: list of average returns at each step
      tot_reg_rec: list of cumulative regrets at each step
      opt_act_rec: list of percentage optimal actions at each step
    """
    rew_rec = []
    avg_ret_rec = []
    tot_reg_rec = []
    opt_act_rec = []

    n_pulls = bandit.n_pulls
    # Convert actual toxicity probabilities to a NumPy array for numerical operations
    actual_toxicity_prob = np.array(bandit.actual_toxicity_prob)
    opt_arm = np.argmin(np.abs(actual_toxicity_prob - bandit.theta))

    cumulative_reward = 0
    optimal_action_count = 0
    cumulative_regret = 0

    # Initialize estimated probabilities and counts
    estimated_probs = np.zeros(bandit.n_arm)

    for t in range(n_pulls):
        # Exploration vs Exploitation
        if np.random.rand() < eps:  # Exploration
            arm = np.random.randint(0, bandit.n_arm)
        else:  # Exploitation
            arm = np.argmin(np.abs(estimated_probs - bandit.theta))

        # Pull the arm and get the reward
        reward = bandit.pull(arm)

        # Update cumulative reward and regret
        cumulative_reward += reward
        if arm == opt_arm:
            optimal_action_count += 1

        regret = abs(bandit.theta - actual_toxicity_prob[opt_arm]) - abs(bandit.theta - actual_toxicity_prob[arm])
        cumulative_regret += regret

        # Update estimated probabilities using running average
        if bandit.num_dose_selected[arm] > 0:
            estimated_probs[arm] = bandit.num_toxic[arm] / bandit.num_dose_selected[arm]

        # Record metrics
        rew_rec.append(reward)
        avg_ret_rec.append(cumulative_reward / (t + 1))
        tot_reg_rec.append(cumulative_regret)
        opt_act_rec.append(optimal_action_count / (t + 1))

    return rew_rec, avg_ret_rec, tot_reg_rec, opt_act_rec

Q1.b2: Plotting the resultsΒΆ

Use the driver code provided to plot:

(1) The average return

(2) The reward

(3) the total regret

(4) the percentage of optimal action across the $N$=20 runs as a function of the number of pulls (2000 pulls for each run) for all three $\epsilon$ values of 0.5, 0.1, and 0.

InΒ [26]:
plt.figure(0)
plt.xlabel("n pulls")
plt.ylabel("avg return")
plt.figure(1)
plt.xlabel("n pulls")
plt.ylabel("reward")
plt.figure(2)
plt.xlabel("n pulls")
plt.ylabel("total regret")
plt.figure(3)
plt.xlabel("n pulls")
plt.ylabel("% optimal action")

N = 20
tot_reg_rec_best = 1e8

for eps in [0.5, 0.1, .0]:
  rew_rec = np.zeros(bandit.n_pulls)
  avg_ret_rec = np.zeros(bandit.n_pulls)
  tot_reg_rec = np.zeros(bandit.n_pulls)
  opt_act_rec = np.zeros(bandit.n_pulls)
  start_time = time.time()
  for n in range(N):
    bandit.init_bandit()
    rew_rec_n, avg_ret_rec_n, tot_reg_rec_n, opt_act_rec_n = eps_greedy(bandit, eps)
    rew_rec += np.array(rew_rec_n)
    avg_ret_rec += np.array(avg_ret_rec_n)
    tot_reg_rec += np.array(tot_reg_rec_n)
    opt_act_rec += np.array(opt_act_rec_n)

  end_time = time.time()
  # print(f"time per run: {end_time - start_time}/N")
  # take the mean
  rew_rec /= N
  avg_ret_rec /= N
  tot_reg_rec /= N
  opt_act_rec /= N

  plt.figure(0)
  plt.plot(avg_ret_rec, label="eps={}".format(eps))
  plt.legend(loc="lower right")

  plt.figure(1)
  plt.plot(rew_rec[1:], label="eps={}".format(eps))
  plt.legend(loc="lower right")

  plt.figure(2)
  plt.plot(tot_reg_rec, label="eps={}".format(eps))
  plt.legend(loc="lower right")

  plt.figure(3)
  plt.plot(opt_act_rec, label="eps={}".format(eps))
  plt.legend(loc="lower right")


  if tot_reg_rec[-1] < tot_reg_rec_best:
        ep_greedy_dict = {
        'opt_act':opt_act_rec,
        'regret_list':tot_reg_rec,}
        tot_reg_rec_best = tot_reg_rec[-1]
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image

Analysis:ΒΆ

Explain the results from the perspective of exploration and how different $\epsilon$ values affect the results.

Q1.b4: Optimistic Initial ValuesΒΆ

We want to run the optimistic initial value method on the same problem described above for the initial q values of -1 and +1 for all arms. Compare its performance, measured by the average reward across $N$=20 runs as a function of the number of pulls, with the non-optimistic setting with initial q values of 0 for all arms. For both optimistic and non-optimistic settings, $\epsilon$=0.

InΒ [Β ]:
plt.figure(4)
plt.xlabel("n pulls")
plt.ylabel("avg return")

plt.figure(5)
plt.xlabel("n pulls")
plt.ylabel("reward")

plt.figure(6)
plt.xlabel("n pulls")
plt.ylabel("total regret")

N = 20
for init_q in [-1, 1]:
  rew_rec = np.zeros(bandit.n_pulls)
  avg_ret_rec = np.zeros(bandit.n_pulls)
  for n in range(N):
    bandit.init_bandit()
    rew_rec_n, avg_ret_rec_n, tot_reg_rec_n, opt_act_rec_n = eps_greedy(bandit, eps=0.0, init_q=init_q)

    rew_rec += np.array(rew_rec_n)
    avg_ret_rec += np.array(avg_ret_rec_n)
    tot_reg_rec += np.array(tot_reg_rec_n)

  avg_ret_rec /= N
  rew_rec /= N
  tot_reg_rec /= N
  plt.figure(4)
  plt.plot(avg_ret_rec[1:], label="q_init_val={}".format(init_q))
  plt.legend(loc="lower right")

  plt.figure(5)
  plt.plot(rew_rec[1:], label="q_init_val={}".format(init_q))
  plt.legend(loc="lower right")

  plt.figure(6)
  plt.plot(tot_reg_rec[1:], label="q_init_val={}".format(init_q))
  plt.legend(loc="lower right")
InΒ [28]:
### Solution

def optimism(bandit, eps, init_q):
    """
    Perform epsilon-greedy strategy with optional optimistic initial values.
    .inputs:
      bandit: Bandit object
      eps: Epsilon value for exploration
      init_q: Initial value for estimated rewards (optimistic or non-optimistic)
    .outputs:
      rew_rec: list of rewards at each step
      avg_ret_rec: list of average returns at each step
      tot_reg_rec: list of cumulative regrets at each step
      opt_act_rec: list of percentage optimal actions at each step
    """
    rew_rec = []
    avg_ret_rec = []
    tot_reg_rec = []
    opt_act_rec = []

    n_pulls = bandit.n_pulls
    actual_toxicity_prob = np.array(bandit.actual_toxicity_prob)
    opt_arm = np.argmin(np.abs(actual_toxicity_prob - bandit.theta))

    cumulative_reward = 0
    optimal_action_count = 0
    cumulative_regret = 0

    # Initialize estimated probabilities with optimistic or non-optimistic values
    estimated_probs = np.full(bandit.n_arm, init_q)

    for t in range(n_pulls):
        # Exploration vs Exploitation
        if np.random.rand() < eps:  # Exploration
            arm = np.random.randint(0, bandit.n_arm)
        else:  # Exploitation
            arm = np.argmin(np.abs(estimated_probs - bandit.theta))

        # Pull the arm and get the reward
        reward = bandit.pull(arm)

        # Update cumulative reward and regret
        cumulative_reward += reward
        if arm == opt_arm:
            optimal_action_count += 1

        regret = abs(bandit.theta - actual_toxicity_prob[opt_arm]) - abs(bandit.theta - actual_toxicity_prob[arm])
        cumulative_regret += regret

        # Update estimated probabilities using running average
        if bandit.num_dose_selected[arm] > 0:
            estimated_probs[arm] = bandit.num_toxic[arm] / bandit.num_dose_selected[arm]

        # Record metrics
        rew_rec.append(reward)
        avg_ret_rec.append(cumulative_reward / (t + 1))
        tot_reg_rec.append(cumulative_regret)
        opt_act_rec.append(optimal_action_count / (t + 1))

    return rew_rec, avg_ret_rec, tot_reg_rec, opt_act_rec
InΒ [29]:
plt.figure(4)
plt.xlabel("n pulls")
plt.ylabel("avg return")

plt.figure(5)
plt.xlabel("n pulls")
plt.ylabel("reward")

plt.figure(6)
plt.xlabel("n pulls")
plt.ylabel("total regret")

N = 20
for init_q in [-1, 1, 0]:  # Testing optimistic (+1), pessimistic (-1), and non-optimistic (0)
    rew_rec = np.zeros(bandit.n_pulls)
    avg_ret_rec = np.zeros(bandit.n_pulls)
    tot_reg_rec = np.zeros(bandit.n_pulls)

    for n in range(N):
        bandit.init_bandit()
        rew_rec_n, avg_ret_rec_n, tot_reg_rec_n, opt_act_rec_n = optimism(bandit, eps=0.0, init_q=init_q)

        rew_rec += np.array(rew_rec_n)
        avg_ret_rec += np.array(avg_ret_rec_n)
        tot_reg_rec += np.array(tot_reg_rec_n)

    avg_ret_rec /= N
    rew_rec /= N
    tot_reg_rec /= N

    # Plot average return
    plt.figure(4)
    plt.plot(avg_ret_rec, label="q_init_val={}".format(init_q))
    plt.legend(loc="lower right")

    # Plot rewards
    plt.figure(5)
    plt.plot(rew_rec, label="q_init_val={}".format(init_q))
    plt.legend(loc="lower right")

    # Plot total regret
    plt.figure(6)
    plt.plot(tot_reg_rec, label="q_init_val={}".format(init_q))
    plt.legend(loc="lower right")

plt.show()
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image

AnalysisΒΆ

Explain how initial q values affect the exploration and the performance.

Upper confidence bound arm selectionΒΆ

Q1.c: Upper-Confidence-Bound action selectionΒΆ

Q1.c1: UCB algorithm implementationΒΆ

Implement the UCB algorithm on the same MAB problem as above.

InΒ [30]:
def ucb(
    bandit: Bandit,
    c: float,
    init_q: float = .0
    ) -> Tuple[list, list, list]:
  """
  .inputs:
    bandit: A bandit problem, instantiated from the above class.
    c: The additional term coefficient.
    init_q: Initial estimation of each arm's value.
  .outputs:
    rew_record: The record of rewards at each timestep.
    avg_ret_record: The average summation of rewards up to step t, where t goes from 0 to n_pulls. For example: If
    we define `ret_T` = \sum^T_{t=0}{r_t}, `avg_ret_record` = ret_T / (1+T).
    tot_reg_record: The  regret up to step t, where t goes from 0 to n_pulls.
    opt_action_perc_record: Percentage of optimal arm selected.
  """
  # init q values (the estimates)
  q = np.array([init_q]*bandit.n_arm, dtype=float)

  ret = .0
  rew_record = []
  avg_ret_record = []
  tot_reg_record = []
  opt_action_perc_record = []

  for t in range(bandit.n_pulls):
    # Assuming to take the first arm always when there is no exploration
    # ----------------------------------------------
    ...
    # ----------------------------------------------


  return rew_record, avg_ret_record, tot_reg_record, opt_action_perc_record
InΒ [31]:
### Solution

def ucb(
    bandit: Bandit,
    c: float,
    init_q: float = 0.0
) -> Tuple[list, list, list, list]:
    """
    .inputs:
      bandit: A bandit problem, instantiated from the above class.
      c: The exploration coefficient.
      init_q: Initial estimation of each arm's value.
    .outputs:
      rew_record: The record of rewards at each timestep.
      avg_ret_record: The average summation of rewards up to step t, where t goes from 0 to n_pulls.
      tot_reg_record: The regret up to step t, where t goes from 0 to n_pulls.
      opt_action_perc_record: Percentage of optimal arm selected.
    """
    # Initialize Q values, counts, and metrics
    q = np.array([init_q] * bandit.n_arm, dtype=float)  # Estimated values of each arm
    n_a = np.zeros(bandit.n_arm)  # Number of times each arm has been pulled
    ret = 0.0  # Cumulative reward
    rew_record = []  # Reward at each time step
    avg_ret_record = []  # Average return at each time step
    tot_reg_record = []  # Cumulative regret at each time step
    opt_action_perc_record = []  # Percentage of optimal actions

    # Determine the optimal arm
    actual_toxicity_prob = np.array(bandit.actual_toxicity_prob)
    opt_arm = np.argmin(np.abs(actual_toxicity_prob - bandit.theta))

    optimal_action_count = 0  # Counter for the number of times the optimal arm is selected
    cumulative_regret = 0.0  # Cumulative regret

    for t in range(bandit.n_pulls):
        # Select the arm using the UCB formula
        if t < bandit.n_arm:
            # Initially pull each arm once to ensure they all have counts > 0
            arm = t
        else:
            # Compute UCB values
            ucb_values = q + c * np.sqrt(np.log(t + 1) / (n_a + 1e-5))
            arm = np.argmax(ucb_values)

        # Pull the selected arm and observe the reward
        reward = bandit.pull(arm)

        # Update the counts and Q values for the selected arm
        n_a[arm] += 1
        q[arm] += (reward - q[arm]) / n_a[arm]

        # Update cumulative reward and regret
        ret += reward
        regret = abs(bandit.theta - actual_toxicity_prob[opt_arm]) - abs(bandit.theta - actual_toxicity_prob[arm])
        cumulative_regret += regret

        # Track optimal arm selection
        if arm == opt_arm:
            optimal_action_count += 1

        # Record metrics
        rew_record.append(reward)
        avg_ret_record.append(ret / (t + 1))
        tot_reg_record.append(cumulative_regret)
        opt_action_perc_record.append(optimal_action_count / (t + 1))

    return rew_record, avg_ret_record, tot_reg_record, opt_action_perc_record


rew_rec, avg_ret_rec, tot_reg_rec, opt_act_rec = ucb(bandit, c=2.0, init_q=0.0)

Q1.c2: Plotting the resultsΒΆ

Use the driver code provided to plot: (1) The average return, (2) The reward, (3) the total regret, and (4) the percentage of optimal action across the $N$=20 runs as a function of the number of pulls (2000 pulls for each run) for three values of $c$=0, 0.5, and 2.0.

InΒ [32]:
plt.figure(7)
plt.xlabel("n pulls")
plt.ylabel("avg return")
plt.figure(8)
plt.xlabel("n pulls")
plt.ylabel("reward")
plt.figure(9)
plt.xlabel("n pulls")
plt.ylabel("total regret")
plt.figure(10)
plt.xlabel("n pulls")
plt.ylabel("% optimal action")

N = 20
tot_reg_rec_best = 1e8
for c in [.0, 0.5, 2]:
  rew_rec = np.zeros(bandit.n_pulls)
  avg_ret_rec = np.zeros(bandit.n_pulls)
  tot_reg_rec = np.zeros(bandit.n_pulls)
  opt_act_rec = np.zeros(bandit.n_pulls)

  for n in range(N):
    bandit.init_bandit()
    rew_rec_n, avg_ret_rec_n, tot_reg_rec_n, opt_act_rec_n = ucb(bandit, c)
    rew_rec += np.array(rew_rec_n)
    avg_ret_rec += np.array(avg_ret_rec_n)
    tot_reg_rec += np.array(tot_reg_rec_n)
    opt_act_rec += np.array(opt_act_rec_n)

  # take the mean
  rew_rec /= N
  avg_ret_rec /= N
  tot_reg_rec /= N
  opt_act_rec /= N

  plt.figure(7)
  plt.plot(avg_ret_rec, label="c={}".format(c))
  plt.legend(loc="lower right")

  plt.figure(8)
  plt.plot(rew_rec, label="c={}".format(c))
  plt.legend(loc="lower right")

  plt.figure(9)
  plt.plot(tot_reg_rec, label="c={}".format(c))
  plt.legend(loc="lower right")

  plt.figure(10)
  plt.plot(opt_act_rec, label="c={}".format(c))
  plt.legend(loc="lower right")

  if tot_reg_rec[-1] < tot_reg_rec_best:
        ucb_dict = {
        'opt_act':opt_act_rec,
        'regret_list':tot_reg_rec,}
        tot_reg_rec_best = tot_reg_rec[-1]
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image

Q1.c3: AnalysisΒΆ

Explain the results from the perspective of exploration and how different $c$ values affect the results.