# import necessary libraries
# libraries for data visualization
import plotly.express as px
import plotly.graph_objects as go
import matplotlib.pyplot as plt
plt.rcParams['animation.ffmpeg_path'] = '/usr/bin/ffmpeg'
# libraries for data loading and processing
import numpy as np
import pandas as pd
from copy import deepcopy
# libraries for data preprocessing
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score
# libraries for displaying animation
import matplotlib.animation as animation
from IPython import display
# library for ignoring warnings
import warnings
warnings.filterwarnings('ignore')Active Learning Classification
Active Learning
Active learning is a machine learning technique that involves iteratively selecting the most informative samples from a large unlabeled dataset and adding them to a smaller labeled dataset. The goal of active learning is to reduce the amount of labeled data required for training while maintaining or improving the performance of a machine learning model.
The active learning process typically involves the following steps:
Initialize the learning algorithmSelect unlabeled samplesRequest labelsIncorporate labeled samplesEvaluate model performance
Active learning has been applied in various domains and has proven to be effective in scenarios where labeled data is limited or costly to obtain like Text Classification, Image recognition etc.
# torch.manual_seed(0)
np.random.seed(0)x = 4 * np.random.rand(500) - 2
y = 4 * np.random.rand(500) - 2fig = px.scatter(x=x, y=y, width=600, height=600)
fig.show()df = pd.DataFrame(columns=("x", "y", "val"))
for i in range(x.shape[0]):
row = pd.DataFrame({"x": [float(x[i])], "y":[float(y[i])], "val": 1 if (x[i] * y[i] > 0) else 0})
df=pd.concat([df, row], ignore_index=True)
df| x | y | val | |
|---|---|---|---|
| 0 | 0.195254 | -0.758477 | 0 |
| 1 | 0.860757 | -0.507861 | 0 |
| 2 | 0.411054 | 0.099882 | 1 |
| 3 | 0.179533 | 1.002380 | 1 |
| 4 | -0.305381 | -0.665970 | 1 |
| ... | ... | ... | ... |
| 495 | -0.913389 | -1.609295 | 1 |
| 496 | -0.178223 | 0.059689 | 0 |
| 497 | -0.393146 | 1.753648 | 0 |
| 498 | -1.006346 | -1.085414 | 1 |
| 499 | 0.023466 | 0.708565 | 1 |
500 rows × 3 columns
x = np.array(df[["x", "y"]])
y = np.array(df["val"]).astype(int)def plot_cluster(x, y, model=None, samples=None, name=None):
x_min, x_max = x[:, 0].min() - 0.5, x[:, 0].max() + 0.5
y_min, y_max = x[:, 1].min() - 0.5, x[:, 1].max() + 0.5
xx, yy = np.meshgrid(np.arange(x_min, x_max, 0.05), np.arange(y_min, y_max, 0.05))
if model is None:
Z = np.empty(xx.ravel().shape)
for i in range(xx.ravel().shape[0]):
Z[i] = 1 if xx.ravel()[i] * yy.ravel()[i] > 0 else 0
else:
Z = model.predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
fig = go.Figure()
# Plot decision boundaries
fig.add_trace(go.Contour(
x=np.arange(x_min, x_max, 0.05),
y=np.arange(y_min, y_max, 0.05),
z=Z,
colorscale='RdYlBu',
contours_coloring='heatmap',
opacity=0.75,
))
if model is None:
# Plot data points when no model is provided
fig.add_trace(go.Scatter(
x=x[:, 0],
y=x[:, 1],
mode='markers',
marker=dict(color='orange', line=dict(color='black', width=1)),
))
else:
# Plot data points with labels when a model is provided
fig.add_trace(go.Scatter(
x=x[:, 0],
y=x[:, 1],
mode='markers',
marker=dict(color=y, colorscale='RdYlBu', line=dict(color='black', width=1))
))
if samples is not None:
# Plot additional samples if available
fig.add_trace(go.Scatter(
x=samples[:, 0],
y=samples[:, 1],
mode='markers',
marker=dict(color='white', colorscale='RdYlBu', line=dict(color='black', width=1))
))
if name:
fig.update_layout(title=name)
fig.update_layout(
xaxis=dict(range=(x_min, x_max)),
yaxis=dict(range=(y_min, y_max)),
width=700,
height=600
)
fig.show()def plot_contour(x, model=None):
x_min, x_max = x[:, 0].min() - 0.5, x[:, 0].max() + 0.5
y_min, y_max = x[:, 1].min() - 0.5, x[:, 1].max() + 0.5
xx, yy = np.meshgrid(np.arange(x_min, x_max, 0.05), np.arange(y_min, y_max, 0.05))
if model is None:
Z = np.empty(xx.ravel().shape)
for i in range(xx.ravel().shape[0]):
Z[i] = 1 if xx.ravel()[i] * yy.ravel()[i] > 0 else 0
else:
Z = model.predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
return (xx, yy, Z)def plot_accuracy(title, overall_accuracy, models_accuracy_list, classifiers, iteration):
fig = px.line(title=title, labels={"x": "Iterations", "y": "Accuracy"})
for i in range(len(models_accuracy_list)):
fig.add_scatter(x=np.arange(iteration), y=models_accuracy_list[i], name=classifiers[i])
fig.add_scatter(x=np.arange(iteration), y=overall_accuracy, name="Overall")
fig.show()class plotAnimation:
def __init__(self, x, y, model=None, model_name=None):
self.x_whole = x
self.y_whole = y
self.model = model
self.model_name = model_name
self.xlim = (x[:, 0].min() - 0.5, x[:, 0].max() + 0.5)
self.ylim = (x[:, 1].min() - 0.5, x[:, 1].max() + 0.5)
self.data = []
self.contour = []
def update(self, x, y, samples, train_acc, test_acc):
self.data.append((x, y, samples, train_acc, test_acc))
self.contour.append(plot_contour(self.x_whole, self.model))
def animate(self, i):
self.ax.contourf(self.contour[i][0], self.contour[i][1], self.contour[i][2], cmap=plt.cm.RdYlBu, alpha=0.8)
if self.model is None:
self.ax.scatter(self.x_whole, self.y_whole, color="orange", edgecolors="k")
else:
scat = self.ax.scatter(self.data[i][0][:, 0], self.data[i][0][:, 1], c=self.data[i][1], cmap="RdYlBu", edgecolors="k")
legend1 = self.ax.legend(*scat.legend_elements(), loc = "upper right", title = "Classes")
self.ax.add_artist(legend1)
scat = self.ax.scatter(self.data[i][2][:, 0], self.data[i][2][:, 1], color="white", label="Last Batch", edgecolors="k")
self.ax.legend([scat], ["Last Batch"], loc="lower right")
title = f"Iteration: {i}, Train Accuracy: {round(self.data[i][3], 3)}, Test Accuracy: {round(self.data[i][4], 3)}"
if self.model_name:
title = self.model_name + " " + title
self.ax.set_title(title)
return self.ax
def show(self):
self.fig, self.ax = plt.subplots()
self.ax.set_xlabel("x")
self.ax.set_ylabel("y")
self.ax.set_xlim(self.xlim)
self.ax.set_ylim(self.ylim)
anim = animation.FuncAnimation(self.fig, self.animate, frames=len(self.data), interval=500, blit=False, repeat=False)
video = anim.to_html5_video()
html = display.HTML(video)
display.display(html)plot_cluster(x, y, None, None)rf = RandomForestClassifier(n_estimators=100, random_state=1)z = np.array(df["val"]).astype(int)
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=1)
x_val, x_test, y_val, y_test = train_test_split(x_test, y_test, test_size=0.5, random_state=1)rf.fit(x_train, y_train)RandomForestClassifier(random_state=1)In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
RandomForestClassifier(random_state=1)
z_test = rf.predict(x_test)plot_cluster(x_test,y_test, model=rf)rf.score(x_test, y_test)1.0
Starting Active Learning
x = 4 * np.random.rand(1010) - 2
y = 4 * np.random.rand(1010) - 2df = pd.DataFrame(columns=["x", "y", "val"])for i in range(x.shape[0]):
row = pd.DataFrame({"x": [float(x[i])], "y":[float(y[i])], "val": 1 if (x[i] * y[i] > 0) else 0})
df = pd.concat([df, row], ignore_index=True)
df| x | y | val | |
|---|---|---|---|
| 0 | 0.371521 | -0.292722 | 0 |
| 1 | -1.959745 | -0.927563 | 1 |
| 2 | -0.096695 | -1.473613 | 1 |
| 3 | 0.835082 | -1.843158 | 0 |
| 4 | -1.824098 | -1.899073 | 1 |
| ... | ... | ... | ... |
| 1005 | -0.788381 | -1.719306 | 1 |
| 1006 | 0.309136 | 0.003035 | 1 |
| 1007 | -1.321288 | 1.854398 | 0 |
| 1008 | -1.362124 | 0.447133 | 0 |
| 1009 | -0.331881 | -0.799873 | 1 |
1010 rows × 3 columns
x = np.array(df[["x", "y"]])
y = np.array(df["val"]).astype(int)x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=1)
x_val, x_test, y_val, y_test = train_test_split(x_test, y_test, test_size=0.5, random_state=1)x_train_random = deepcopy(x_train)
y_train_random = deepcopy(y_train)x_training = x_train[:10]
y_training = y_train[:10]
x_training_random = x_train_random[:10]
y_training_random = y_train_random[:10]x_pool = x_train[10:]
y_pool = y_train[10:]
x_pool_random = x_train_random[10:]
y_pool_random = y_train_random[10:]def entropy_sampling(stack):
entropy_list = []
for x in stack:
unique_values, counts = np.unique(x, return_counts=True)
length = x.shape[0]
if len(unique_values) == 1:
entropy_list.append(0)
else:
entropy = 0
for count in counts:
entropy += count/length * np.log2(count/length)
entropy_list.append(-entropy)
return entropy_listdef select_samples(entropy_list, samples):
samples_idx = np.argsort(entropy_list)[-samples:]
return samples_idxdef output_list(stack):
output_list = np.array([int(np.bincount(x).argmax()) for x in stack])
return output_listdef output_accuracy_calc(output_list, y_val):
return 100 * np.sum(output_list == y_val) / y_val.sizeclassifiers = ("Logistic Regression", "Random Forest", "Decision Tree")
models = [LogisticRegression(), RandomForestClassifier(random_state=1), DecisionTreeClassifier(random_state=1)]
models_random = [LogisticRegression(), RandomForestClassifier(random_state=1), DecisionTreeClassifier(random_state=1)]
anim_model = [plotAnimation(x, y, model=models[i], model_name=classifiers[i]) for i in range(len(models))]samples = 5
models_accuracy_list = [[], [], []]
models_accuracy_list_random = [[], [], []]
models_test_accuracy_list = [[], [], []]
models_test_accuracy_list_random = [[], [], []]
overall_accuracy = []
overall_test_accuracy = []
overall_accuracy_random = []
overall_test_accuracy_random = []from tqdm import trange
for iteration in trange(len(x_train)//samples):
if x_pool.shape[0] < samples:
break
y_pred = []
y_pred_train = []
y_pred_test = []
y_pred_random = []
y_pred_train_random = []
y_pred_test_random = []
for model_idx, model in enumerate(models):
model.fit(x_training, y_training)
pred_train = model.predict(x_train)
models_accuracy_list[model_idx].append(accuracy_score(y_train, pred_train))
pred_test = model.predict(x_test)
models_test_accuracy_list[model_idx].append(accuracy_score(y_test, pred_test))
y_pred.append(np.array(model.predict(x_pool)))
y_pred_train.append(np.array(pred_train))
y_pred_test.append(np.array(pred_test))
for model_idx, model in enumerate(models_random):
model.fit(x_training_random, y_training_random)
pred_train = model.predict(x_train_random)
models_accuracy_list_random[model_idx].append(accuracy_score(y_train_random, pred_train))
pred_test = model.predict(x_test)
models_test_accuracy_list_random[model_idx].append(accuracy_score(y_test, pred_test))
y_pred_train_random.append(np.array(pred_train))
y_pred_test_random.append(np.array(pred_test))
stack = np.stack(y_pred, axis=-1)
stack_train = np.stack(y_pred_train, axis=-1)
stack_test = np.stack(y_pred_test, axis=-1)
sample_idx = select_samples(entropy_sampling(stack), samples)
stack_train_random = np.stack(y_pred_train_random, axis=-1)
stack_test_random = np.stack(y_pred_test_random, axis=-1)
random_idx = np.random.randint(0, x_pool.shape[0], samples)
output_train = output_list(stack_train)
overall_accuracy.append(accuracy_score(output_train, y_train))
output_test = output_list(stack_test)
overall_test_accuracy.append(accuracy_score(output_test, y_test))
output_train_random = output_list(stack_train_random)
overall_accuracy_random.append(accuracy_score(output_train_random, y_train_random))
output_test_random = output_list(stack_test_random)
overall_test_accuracy_random.append(accuracy_score(output_test_random, y_test))
for i in range(len(anim_model)):
anim_model[i].update(x_training, y_training, x_pool[sample_idx], models_accuracy_list[i][iteration], models_test_accuracy_list[i][iteration])
x_training = np.append(x_training, x_pool[sample_idx], axis=0)
y_training = np.append(y_training, y_pool[sample_idx], axis=0)
x_training_random = np.append(x_training_random, x_pool_random[random_idx], axis=0)
y_training_random = np.append(y_training_random, y_pool_random[random_idx], axis=0)
x_pool = np.delete(x_pool, sample_idx, axis=0)
y_pool = np.delete(y_pool, sample_idx, axis=0)
x_pool_random = np.delete(x_pool_random, random_idx, axis=0)
y_pool_random = np.delete(y_pool_random, random_idx, axis=0)
for i in range(len(anim_model)):
anim_model[i].show()
plt.close("all") 99%|█████████▉| 159/161 [01:23<00:01, 1.92it/s]
plot_accuracy(title="Training Accuracy", overall_accuracy=overall_accuracy, models_accuracy_list=models_accuracy_list, classifiers=classifiers, iteration=iteration)fig = px.line()
fig.add_scatter(x=np.arange(len(overall_accuracy)), y=overall_accuracy, name="Active Learning")
fig.add_scatter(x=np.arange(len(overall_accuracy_random)), y=overall_accuracy_random, name="Random Sampling")
fig.show()fig = px.line()
for i in range(len(models_accuracy_list)):
fig.add_scatter(x=np.arange(len(overall_test_accuracy)), y=models_accuracy_list[i], name=classifiers[i])
fig.show()for model in models:
plot_cluster(x_test, y_test, model=model)