Active Learning Classification

Author
Published

May 8, 2023

Active Learning

Active learning is a machine learning technique that involves iteratively selecting the most informative samples from a large unlabeled dataset and adding them to a smaller labeled dataset. The goal of active learning is to reduce the amount of labeled data required for training while maintaining or improving the performance of a machine learning model.

The active learning process typically involves the following steps:

  • Initialize the learning algorithm

  • Select unlabeled samples

  • Request labels

  • Incorporate labeled samples

  • Evaluate model performance

Active learning has been applied in various domains and has proven to be effective in scenarios where labeled data is limited or costly to obtain like Text Classification, Image recognition etc.

# import necessary libraries

# libraries for data visualization
import plotly.express as px
import plotly.graph_objects as go
import matplotlib.pyplot as plt
plt.rcParams['animation.ffmpeg_path'] = '/usr/bin/ffmpeg'

# libraries for data loading and processing
import numpy as np
import pandas as pd
from copy import deepcopy

# libraries for data preprocessing
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score

# libraries for displaying animation
import matplotlib.animation as animation
from IPython import display

# library for ignoring warnings
import warnings
warnings.filterwarnings('ignore')
# torch.manual_seed(0)
np.random.seed(0)
x = 4 * np.random.rand(500) - 2
y = 4 * np.random.rand(500) - 2
fig = px.scatter(x=x, y=y, width=600, height=600)
fig.show()
df = pd.DataFrame(columns=("x", "y", "val"))

for i in range(x.shape[0]):
    row = pd.DataFrame({"x": [float(x[i])], "y":[float(y[i])], "val": 1 if (x[i] * y[i] > 0) else 0})

    df=pd.concat([df, row], ignore_index=True)

df
x y val
0 0.195254 -0.758477 0
1 0.860757 -0.507861 0
2 0.411054 0.099882 1
3 0.179533 1.002380 1
4 -0.305381 -0.665970 1
... ... ... ...
495 -0.913389 -1.609295 1
496 -0.178223 0.059689 0
497 -0.393146 1.753648 0
498 -1.006346 -1.085414 1
499 0.023466 0.708565 1

500 rows × 3 columns

x = np.array(df[["x", "y"]])
y = np.array(df["val"]).astype(int)
def plot_cluster(x, y, model=None, samples=None, name=None):
    x_min, x_max = x[:, 0].min() - 0.5, x[:, 0].max() + 0.5
    y_min, y_max = x[:, 1].min() - 0.5, x[:, 1].max() + 0.5

    xx, yy = np.meshgrid(np.arange(x_min, x_max, 0.05), np.arange(y_min, y_max, 0.05))

    if model is None:
        Z = np.empty(xx.ravel().shape)
        for i in range(xx.ravel().shape[0]):
            Z[i] = 1 if xx.ravel()[i] * yy.ravel()[i] > 0 else 0
    else:
        Z = model.predict(np.c_[xx.ravel(), yy.ravel()])

    Z = Z.reshape(xx.shape)

    fig = go.Figure()

    # Plot decision boundaries
    fig.add_trace(go.Contour(
        x=np.arange(x_min, x_max, 0.05),
        y=np.arange(y_min, y_max, 0.05),
        z=Z,
        colorscale='RdYlBu',
        contours_coloring='heatmap',
        opacity=0.75,
    ))

    if model is None:
        # Plot data points when no model is provided
        fig.add_trace(go.Scatter(
            x=x[:, 0],
            y=x[:, 1],
            mode='markers',
            marker=dict(color='orange', line=dict(color='black', width=1)),
        ))
    else:
        # Plot data points with labels when a model is provided
        fig.add_trace(go.Scatter(
            x=x[:, 0],
            y=x[:, 1],
            mode='markers',
            marker=dict(color=y, colorscale='RdYlBu', line=dict(color='black', width=1))
        ))

    if samples is not None:
        # Plot additional samples if available
        fig.add_trace(go.Scatter(
            x=samples[:, 0],
            y=samples[:, 1],
            mode='markers',
            marker=dict(color='white', colorscale='RdYlBu', line=dict(color='black', width=1))
        ))
    
    if name:
        fig.update_layout(title=name)

    fig.update_layout(
        xaxis=dict(range=(x_min, x_max)),
        yaxis=dict(range=(y_min, y_max)),
        width=700,
        height=600
    )

    fig.show()
def plot_contour(x, model=None):
    x_min, x_max = x[:, 0].min() - 0.5, x[:, 0].max() + 0.5
    y_min, y_max = x[:, 1].min() - 0.5, x[:, 1].max() + 0.5

    xx, yy = np.meshgrid(np.arange(x_min, x_max, 0.05), np.arange(y_min, y_max, 0.05))

    if model is None:
        Z = np.empty(xx.ravel().shape)
        for i in range(xx.ravel().shape[0]):
            Z[i] = 1 if xx.ravel()[i] * yy.ravel()[i] > 0 else 0
    else:
        Z = model.predict(np.c_[xx.ravel(), yy.ravel()])

    Z = Z.reshape(xx.shape)
    return (xx, yy, Z)
def plot_accuracy(title, overall_accuracy, models_accuracy_list, classifiers, iteration):
    fig = px.line(title=title, labels={"x": "Iterations", "y": "Accuracy"})
    for i in range(len(models_accuracy_list)):
        fig.add_scatter(x=np.arange(iteration), y=models_accuracy_list[i], name=classifiers[i])
    fig.add_scatter(x=np.arange(iteration), y=overall_accuracy, name="Overall")
    fig.show()
class plotAnimation:
    def __init__(self, x, y, model=None, model_name=None):
        self.x_whole = x
        self.y_whole = y
        self.model = model
        self.model_name = model_name

        self.xlim = (x[:, 0].min() - 0.5, x[:, 0].max() + 0.5)
        self.ylim = (x[:, 1].min() - 0.5, x[:, 1].max() + 0.5)

        self.data = []
        self.contour = []

    def update(self, x, y, samples, train_acc, test_acc):
        self.data.append((x, y, samples, train_acc, test_acc))
        self.contour.append(plot_contour(self.x_whole, self.model))
    
    def animate(self, i):
        self.ax.contourf(self.contour[i][0], self.contour[i][1], self.contour[i][2], cmap=plt.cm.RdYlBu, alpha=0.8)

        if self.model is None:
            self.ax.scatter(self.x_whole, self.y_whole, color="orange", edgecolors="k")
        else:
            scat = self.ax.scatter(self.data[i][0][:, 0], self.data[i][0][:, 1], c=self.data[i][1], cmap="RdYlBu", edgecolors="k")

        legend1 = self.ax.legend(*scat.legend_elements(), loc = "upper right", title = "Classes")
        
        self.ax.add_artist(legend1)

        scat = self.ax.scatter(self.data[i][2][:, 0], self.data[i][2][:, 1], color="white", label="Last Batch", edgecolors="k")
        
        self.ax.legend([scat], ["Last Batch"], loc="lower right")

        title = f"Iteration: {i}, Train Accuracy: {round(self.data[i][3], 3)}, Test Accuracy: {round(self.data[i][4], 3)}"

        if self.model_name:
            title = self.model_name + " " + title

        self.ax.set_title(title)

        return self.ax

    def show(self):
        self.fig, self.ax = plt.subplots()

        self.ax.set_xlabel("x")
        self.ax.set_ylabel("y")
        
        self.ax.set_xlim(self.xlim)
        self.ax.set_ylim(self.ylim)

        anim = animation.FuncAnimation(self.fig, self.animate, frames=len(self.data), interval=500, blit=False, repeat=False)
        video = anim.to_html5_video()

        html = display.HTML(video)
        display.display(html)
plot_cluster(x, y, None, None)
rf = RandomForestClassifier(n_estimators=100, random_state=1)
z = np.array(df["val"]).astype(int)

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=1)
x_val, x_test, y_val, y_test = train_test_split(x_test, y_test, test_size=0.5, random_state=1)
rf.fit(x_train, y_train)
RandomForestClassifier(random_state=1)
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
z_test = rf.predict(x_test)
plot_cluster(x_test,y_test, model=rf)
rf.score(x_test, y_test)
1.0

Starting Active Learning

x = 4 * np.random.rand(1010) - 2
y = 4 * np.random.rand(1010) - 2
df = pd.DataFrame(columns=["x", "y", "val"])
for i in range(x.shape[0]):
    row = pd.DataFrame({"x": [float(x[i])], "y":[float(y[i])], "val": 1 if (x[i] * y[i] > 0) else 0})

    df = pd.concat([df, row], ignore_index=True)

df
x y val
0 0.371521 -0.292722 0
1 -1.959745 -0.927563 1
2 -0.096695 -1.473613 1
3 0.835082 -1.843158 0
4 -1.824098 -1.899073 1
... ... ... ...
1005 -0.788381 -1.719306 1
1006 0.309136 0.003035 1
1007 -1.321288 1.854398 0
1008 -1.362124 0.447133 0
1009 -0.331881 -0.799873 1

1010 rows × 3 columns

x = np.array(df[["x", "y"]])
y = np.array(df["val"]).astype(int)
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=1)
x_val, x_test, y_val, y_test = train_test_split(x_test, y_test, test_size=0.5, random_state=1)
x_train_random = deepcopy(x_train)
y_train_random = deepcopy(y_train)
x_training = x_train[:10]
y_training = y_train[:10]

x_training_random = x_train_random[:10]
y_training_random = y_train_random[:10]
x_pool = x_train[10:]
y_pool = y_train[10:]

x_pool_random = x_train_random[10:]
y_pool_random = y_train_random[10:]
def entropy_sampling(stack):
    entropy_list = []
    for x in stack:
        unique_values, counts = np.unique(x, return_counts=True)
        length = x.shape[0]
        if len(unique_values) == 1:
            entropy_list.append(0)
        else:
            entropy = 0
            for count in counts:
                entropy += count/length * np.log2(count/length)
            entropy_list.append(-entropy)
    return entropy_list
def select_samples(entropy_list, samples):
    samples_idx = np.argsort(entropy_list)[-samples:]
    return samples_idx
def output_list(stack):
    output_list = np.array([int(np.bincount(x).argmax()) for x in stack])
    return output_list
def output_accuracy_calc(output_list, y_val):
    return 100 * np.sum(output_list == y_val) / y_val.size
classifiers = ("Logistic Regression", "Random Forest", "Decision Tree")

models = [LogisticRegression(), RandomForestClassifier(random_state=1), DecisionTreeClassifier(random_state=1)]
models_random = [LogisticRegression(), RandomForestClassifier(random_state=1), DecisionTreeClassifier(random_state=1)]

anim_model = [plotAnimation(x, y, model=models[i], model_name=classifiers[i]) for i in range(len(models))]
samples = 5

models_accuracy_list = [[], [], []]
models_accuracy_list_random = [[], [], []]

models_test_accuracy_list = [[], [], []]
models_test_accuracy_list_random = [[], [], []]

overall_accuracy = []
overall_test_accuracy = []

overall_accuracy_random = []
overall_test_accuracy_random = []
from tqdm import trange
for iteration in trange(len(x_train)//samples):
    if x_pool.shape[0] < samples:
        break
    
    y_pred = []
    y_pred_train = []
    y_pred_test = []
    y_pred_random = []
    y_pred_train_random = []
    y_pred_test_random = []

    for model_idx, model in enumerate(models):
        model.fit(x_training, y_training)
        pred_train = model.predict(x_train)
        models_accuracy_list[model_idx].append(accuracy_score(y_train, pred_train))
        pred_test = model.predict(x_test)
        models_test_accuracy_list[model_idx].append(accuracy_score(y_test, pred_test))
        y_pred.append(np.array(model.predict(x_pool)))
        y_pred_train.append(np.array(pred_train))
        y_pred_test.append(np.array(pred_test))
    
    for model_idx, model in enumerate(models_random):
        model.fit(x_training_random, y_training_random)
        pred_train = model.predict(x_train_random)
        models_accuracy_list_random[model_idx].append(accuracy_score(y_train_random, pred_train))
        pred_test = model.predict(x_test)
        models_test_accuracy_list_random[model_idx].append(accuracy_score(y_test, pred_test))
        y_pred_train_random.append(np.array(pred_train))
        y_pred_test_random.append(np.array(pred_test))
    
    stack = np.stack(y_pred, axis=-1)
    stack_train = np.stack(y_pred_train, axis=-1)
    stack_test = np.stack(y_pred_test, axis=-1)

    sample_idx = select_samples(entropy_sampling(stack), samples)

    stack_train_random = np.stack(y_pred_train_random, axis=-1)
    stack_test_random = np.stack(y_pred_test_random, axis=-1)

    random_idx = np.random.randint(0, x_pool.shape[0], samples)

    output_train = output_list(stack_train)
    overall_accuracy.append(accuracy_score(output_train, y_train))
    output_test = output_list(stack_test)
    overall_test_accuracy.append(accuracy_score(output_test, y_test))

    output_train_random = output_list(stack_train_random)
    overall_accuracy_random.append(accuracy_score(output_train_random, y_train_random))
    output_test_random = output_list(stack_test_random)
    overall_test_accuracy_random.append(accuracy_score(output_test_random, y_test))

    for i in range(len(anim_model)):
        anim_model[i].update(x_training, y_training, x_pool[sample_idx], models_accuracy_list[i][iteration], models_test_accuracy_list[i][iteration])

    x_training = np.append(x_training, x_pool[sample_idx], axis=0)
    y_training = np.append(y_training, y_pool[sample_idx], axis=0)

    x_training_random = np.append(x_training_random, x_pool_random[random_idx], axis=0)
    y_training_random = np.append(y_training_random, y_pool_random[random_idx], axis=0)

    x_pool = np.delete(x_pool, sample_idx, axis=0)
    y_pool = np.delete(y_pool, sample_idx, axis=0)

    x_pool_random = np.delete(x_pool_random, random_idx, axis=0)
    y_pool_random = np.delete(y_pool_random, random_idx, axis=0)

for i in range(len(anim_model)):
    anim_model[i].show()

plt.close("all")
 99%|█████████▉| 159/161 [01:23<00:01,  1.92it/s]
plot_accuracy(title="Training Accuracy", overall_accuracy=overall_accuracy, models_accuracy_list=models_accuracy_list, classifiers=classifiers, iteration=iteration)
fig = px.line()
fig.add_scatter(x=np.arange(len(overall_accuracy)), y=overall_accuracy, name="Active Learning")
fig.add_scatter(x=np.arange(len(overall_accuracy_random)), y=overall_accuracy_random, name="Random Sampling")

fig.show()
fig = px.line()
for i in range(len(models_accuracy_list)):
    fig.add_scatter(x=np.arange(len(overall_test_accuracy)), y=models_accuracy_list[i], name=classifiers[i])

fig.show()
for model in models:
    plot_cluster(x_test, y_test, model=model)