# import necessary libraries
# libraries for data visualization
import plotly.express as px
import plotly.graph_objects as go
import matplotlib.pyplot as plt
'animation.ffmpeg_path'] = '/usr/bin/ffmpeg'
plt.rcParams[
# libraries for data loading and processing
import numpy as np
import pandas as pd
from copy import deepcopy
# libraries for data preprocessing
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score
# libraries for displaying animation
import matplotlib.animation as animation
from IPython import display
# library for ignoring warnings
import warnings
'ignore') warnings.filterwarnings(
Active Learning Classification
Active Learning
Active learning
is a machine learning technique that involves iteratively selecting the most informative samples from a large unlabeled dataset and adding them to a smaller labeled dataset. The goal of active learning is to reduce the amount of labeled data required for training while maintaining or improving the performance of a machine learning model.
The active learning process typically involves the following steps:
Initialize the learning algorithm
Select unlabeled samples
Request labels
Incorporate labeled samples
Evaluate model performance
Active learning has been applied in various domains and has proven to be effective in scenarios where labeled data is limited or costly to obtain like Text Classification
, Image recognition
etc.
# torch.manual_seed(0)
0) np.random.seed(
= 4 * np.random.rand(500) - 2
x = 4 * np.random.rand(500) - 2 y
= px.scatter(x=x, y=y, width=600, height=600)
fig fig.show()
= pd.DataFrame(columns=("x", "y", "val"))
df
for i in range(x.shape[0]):
= pd.DataFrame({"x": [float(x[i])], "y":[float(y[i])], "val": 1 if (x[i] * y[i] > 0) else 0})
row
=pd.concat([df, row], ignore_index=True)
df
df
x | y | val | |
---|---|---|---|
0 | 0.195254 | -0.758477 | 0 |
1 | 0.860757 | -0.507861 | 0 |
2 | 0.411054 | 0.099882 | 1 |
3 | 0.179533 | 1.002380 | 1 |
4 | -0.305381 | -0.665970 | 1 |
... | ... | ... | ... |
495 | -0.913389 | -1.609295 | 1 |
496 | -0.178223 | 0.059689 | 0 |
497 | -0.393146 | 1.753648 | 0 |
498 | -1.006346 | -1.085414 | 1 |
499 | 0.023466 | 0.708565 | 1 |
500 rows × 3 columns
= np.array(df[["x", "y"]])
x = np.array(df["val"]).astype(int) y
def plot_cluster(x, y, model=None, samples=None, name=None):
= x[:, 0].min() - 0.5, x[:, 0].max() + 0.5
x_min, x_max = x[:, 1].min() - 0.5, x[:, 1].max() + 0.5
y_min, y_max
= np.meshgrid(np.arange(x_min, x_max, 0.05), np.arange(y_min, y_max, 0.05))
xx, yy
if model is None:
= np.empty(xx.ravel().shape)
Z for i in range(xx.ravel().shape[0]):
= 1 if xx.ravel()[i] * yy.ravel()[i] > 0 else 0
Z[i] else:
= model.predict(np.c_[xx.ravel(), yy.ravel()])
Z
= Z.reshape(xx.shape)
Z
= go.Figure()
fig
# Plot decision boundaries
fig.add_trace(go.Contour(=np.arange(x_min, x_max, 0.05),
x=np.arange(y_min, y_max, 0.05),
y=Z,
z='RdYlBu',
colorscale='heatmap',
contours_coloring=0.75,
opacity
))
if model is None:
# Plot data points when no model is provided
fig.add_trace(go.Scatter(=x[:, 0],
x=x[:, 1],
y='markers',
mode=dict(color='orange', line=dict(color='black', width=1)),
marker
))else:
# Plot data points with labels when a model is provided
fig.add_trace(go.Scatter(=x[:, 0],
x=x[:, 1],
y='markers',
mode=dict(color=y, colorscale='RdYlBu', line=dict(color='black', width=1))
marker
))
if samples is not None:
# Plot additional samples if available
fig.add_trace(go.Scatter(=samples[:, 0],
x=samples[:, 1],
y='markers',
mode=dict(color='white', colorscale='RdYlBu', line=dict(color='black', width=1))
marker
))
if name:
=name)
fig.update_layout(title
fig.update_layout(=dict(range=(x_min, x_max)),
xaxis=dict(range=(y_min, y_max)),
yaxis=700,
width=600
height
)
fig.show()
def plot_contour(x, model=None):
= x[:, 0].min() - 0.5, x[:, 0].max() + 0.5
x_min, x_max = x[:, 1].min() - 0.5, x[:, 1].max() + 0.5
y_min, y_max
= np.meshgrid(np.arange(x_min, x_max, 0.05), np.arange(y_min, y_max, 0.05))
xx, yy
if model is None:
= np.empty(xx.ravel().shape)
Z for i in range(xx.ravel().shape[0]):
= 1 if xx.ravel()[i] * yy.ravel()[i] > 0 else 0
Z[i] else:
= model.predict(np.c_[xx.ravel(), yy.ravel()])
Z
= Z.reshape(xx.shape)
Z return (xx, yy, Z)
def plot_accuracy(title, overall_accuracy, models_accuracy_list, classifiers, iteration):
= px.line(title=title, labels={"x": "Iterations", "y": "Accuracy"})
fig for i in range(len(models_accuracy_list)):
=np.arange(iteration), y=models_accuracy_list[i], name=classifiers[i])
fig.add_scatter(x=np.arange(iteration), y=overall_accuracy, name="Overall")
fig.add_scatter(x fig.show()
class plotAnimation:
def __init__(self, x, y, model=None, model_name=None):
self.x_whole = x
self.y_whole = y
self.model = model
self.model_name = model_name
self.xlim = (x[:, 0].min() - 0.5, x[:, 0].max() + 0.5)
self.ylim = (x[:, 1].min() - 0.5, x[:, 1].max() + 0.5)
self.data = []
self.contour = []
def update(self, x, y, samples, train_acc, test_acc):
self.data.append((x, y, samples, train_acc, test_acc))
self.contour.append(plot_contour(self.x_whole, self.model))
def animate(self, i):
self.ax.contourf(self.contour[i][0], self.contour[i][1], self.contour[i][2], cmap=plt.cm.RdYlBu, alpha=0.8)
if self.model is None:
self.ax.scatter(self.x_whole, self.y_whole, color="orange", edgecolors="k")
else:
= self.ax.scatter(self.data[i][0][:, 0], self.data[i][0][:, 1], c=self.data[i][1], cmap="RdYlBu", edgecolors="k")
scat
= self.ax.legend(*scat.legend_elements(), loc = "upper right", title = "Classes")
legend1
self.ax.add_artist(legend1)
= self.ax.scatter(self.data[i][2][:, 0], self.data[i][2][:, 1], color="white", label="Last Batch", edgecolors="k")
scat
self.ax.legend([scat], ["Last Batch"], loc="lower right")
= f"Iteration: {i}, Train Accuracy: {round(self.data[i][3], 3)}, Test Accuracy: {round(self.data[i][4], 3)}"
title
if self.model_name:
= self.model_name + " " + title
title
self.ax.set_title(title)
return self.ax
def show(self):
self.fig, self.ax = plt.subplots()
self.ax.set_xlabel("x")
self.ax.set_ylabel("y")
self.ax.set_xlim(self.xlim)
self.ax.set_ylim(self.ylim)
= animation.FuncAnimation(self.fig, self.animate, frames=len(self.data), interval=500, blit=False, repeat=False)
anim = anim.to_html5_video()
video
= display.HTML(video)
html display.display(html)
None, None) plot_cluster(x, y,
= RandomForestClassifier(n_estimators=100, random_state=1) rf
= np.array(df["val"]).astype(int)
z
= train_test_split(x, y, test_size=0.2, random_state=1)
x_train, x_test, y_train, y_test = train_test_split(x_test, y_test, test_size=0.5, random_state=1) x_val, x_test, y_val, y_test
rf.fit(x_train, y_train)
RandomForestClassifier(random_state=1)In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
RandomForestClassifier(random_state=1)
= rf.predict(x_test) z_test
=rf) plot_cluster(x_test,y_test, model
rf.score(x_test, y_test)
1.0
Starting Active Learning
= 4 * np.random.rand(1010) - 2
x = 4 * np.random.rand(1010) - 2 y
= pd.DataFrame(columns=["x", "y", "val"]) df
for i in range(x.shape[0]):
= pd.DataFrame({"x": [float(x[i])], "y":[float(y[i])], "val": 1 if (x[i] * y[i] > 0) else 0})
row
= pd.concat([df, row], ignore_index=True)
df
df
x | y | val | |
---|---|---|---|
0 | 0.371521 | -0.292722 | 0 |
1 | -1.959745 | -0.927563 | 1 |
2 | -0.096695 | -1.473613 | 1 |
3 | 0.835082 | -1.843158 | 0 |
4 | -1.824098 | -1.899073 | 1 |
... | ... | ... | ... |
1005 | -0.788381 | -1.719306 | 1 |
1006 | 0.309136 | 0.003035 | 1 |
1007 | -1.321288 | 1.854398 | 0 |
1008 | -1.362124 | 0.447133 | 0 |
1009 | -0.331881 | -0.799873 | 1 |
1010 rows × 3 columns
= np.array(df[["x", "y"]])
x = np.array(df["val"]).astype(int) y
= train_test_split(x, y, test_size=0.2, random_state=1)
x_train, x_test, y_train, y_test = train_test_split(x_test, y_test, test_size=0.5, random_state=1) x_val, x_test, y_val, y_test
= deepcopy(x_train)
x_train_random = deepcopy(y_train) y_train_random
= x_train[:10]
x_training = y_train[:10]
y_training
= x_train_random[:10]
x_training_random = y_train_random[:10] y_training_random
= x_train[10:]
x_pool = y_train[10:]
y_pool
= x_train_random[10:]
x_pool_random = y_train_random[10:] y_pool_random
def entropy_sampling(stack):
= []
entropy_list for x in stack:
= np.unique(x, return_counts=True)
unique_values, counts = x.shape[0]
length if len(unique_values) == 1:
0)
entropy_list.append(else:
= 0
entropy for count in counts:
+= count/length * np.log2(count/length)
entropy -entropy)
entropy_list.append(return entropy_list
def select_samples(entropy_list, samples):
= np.argsort(entropy_list)[-samples:]
samples_idx return samples_idx
def output_list(stack):
= np.array([int(np.bincount(x).argmax()) for x in stack])
output_list return output_list
def output_accuracy_calc(output_list, y_val):
return 100 * np.sum(output_list == y_val) / y_val.size
= ("Logistic Regression", "Random Forest", "Decision Tree")
classifiers
= [LogisticRegression(), RandomForestClassifier(random_state=1), DecisionTreeClassifier(random_state=1)]
models = [LogisticRegression(), RandomForestClassifier(random_state=1), DecisionTreeClassifier(random_state=1)]
models_random
= [plotAnimation(x, y, model=models[i], model_name=classifiers[i]) for i in range(len(models))] anim_model
= 5
samples
= [[], [], []]
models_accuracy_list = [[], [], []]
models_accuracy_list_random
= [[], [], []]
models_test_accuracy_list = [[], [], []]
models_test_accuracy_list_random
= []
overall_accuracy = []
overall_test_accuracy
= []
overall_accuracy_random = [] overall_test_accuracy_random
from tqdm import trange
for iteration in trange(len(x_train)//samples):
if x_pool.shape[0] < samples:
break
= []
y_pred = []
y_pred_train = []
y_pred_test = []
y_pred_random = []
y_pred_train_random = []
y_pred_test_random
for model_idx, model in enumerate(models):
model.fit(x_training, y_training)= model.predict(x_train)
pred_train
models_accuracy_list[model_idx].append(accuracy_score(y_train, pred_train))= model.predict(x_test)
pred_test
models_test_accuracy_list[model_idx].append(accuracy_score(y_test, pred_test))
y_pred.append(np.array(model.predict(x_pool)))
y_pred_train.append(np.array(pred_train))
y_pred_test.append(np.array(pred_test))
for model_idx, model in enumerate(models_random):
model.fit(x_training_random, y_training_random)= model.predict(x_train_random)
pred_train
models_accuracy_list_random[model_idx].append(accuracy_score(y_train_random, pred_train))= model.predict(x_test)
pred_test
models_test_accuracy_list_random[model_idx].append(accuracy_score(y_test, pred_test))
y_pred_train_random.append(np.array(pred_train))
y_pred_test_random.append(np.array(pred_test))
= np.stack(y_pred, axis=-1)
stack = np.stack(y_pred_train, axis=-1)
stack_train = np.stack(y_pred_test, axis=-1)
stack_test
= select_samples(entropy_sampling(stack), samples)
sample_idx
= np.stack(y_pred_train_random, axis=-1)
stack_train_random = np.stack(y_pred_test_random, axis=-1)
stack_test_random
= np.random.randint(0, x_pool.shape[0], samples)
random_idx
= output_list(stack_train)
output_train
overall_accuracy.append(accuracy_score(output_train, y_train))= output_list(stack_test)
output_test
overall_test_accuracy.append(accuracy_score(output_test, y_test))
= output_list(stack_train_random)
output_train_random
overall_accuracy_random.append(accuracy_score(output_train_random, y_train_random))= output_list(stack_test_random)
output_test_random
overall_test_accuracy_random.append(accuracy_score(output_test_random, y_test))
for i in range(len(anim_model)):
anim_model[i].update(x_training, y_training, x_pool[sample_idx], models_accuracy_list[i][iteration], models_test_accuracy_list[i][iteration])
= np.append(x_training, x_pool[sample_idx], axis=0)
x_training = np.append(y_training, y_pool[sample_idx], axis=0)
y_training
= np.append(x_training_random, x_pool_random[random_idx], axis=0)
x_training_random = np.append(y_training_random, y_pool_random[random_idx], axis=0)
y_training_random
= np.delete(x_pool, sample_idx, axis=0)
x_pool = np.delete(y_pool, sample_idx, axis=0)
y_pool
= np.delete(x_pool_random, random_idx, axis=0)
x_pool_random = np.delete(y_pool_random, random_idx, axis=0)
y_pool_random
for i in range(len(anim_model)):
anim_model[i].show()
"all") plt.close(
99%|█████████▉| 159/161 [01:23<00:01, 1.92it/s]
="Training Accuracy", overall_accuracy=overall_accuracy, models_accuracy_list=models_accuracy_list, classifiers=classifiers, iteration=iteration) plot_accuracy(title
= px.line()
fig =np.arange(len(overall_accuracy)), y=overall_accuracy, name="Active Learning")
fig.add_scatter(x=np.arange(len(overall_accuracy_random)), y=overall_accuracy_random, name="Random Sampling")
fig.add_scatter(x
fig.show()
= px.line()
fig for i in range(len(models_accuracy_list)):
=np.arange(len(overall_test_accuracy)), y=models_accuracy_list[i], name=classifiers[i])
fig.add_scatter(x
fig.show()
for model in models:
=model) plot_cluster(x_test, y_test, model