Source code for d2l.torch
Open the notebook in Colab
Open the notebook in Colab
Open the notebook in Colab

# This file is generated automatically through:
#    d2lbook build lib
# Don't edit it directly

# Defined in file: ./chapter_preface/index.md
import collections
import hashlib
import math
import os
import random
import re
import shutil
import sys
import tarfile
import time
import zipfile
from collections import defaultdict

import pandas as pd
import requests
from IPython import display
from matplotlib import pyplot as plt

d2l = sys.modules[__name__]


# Defined in file: ./chapter_preface/index.md
import numpy as np
import torch
import torchvision
from PIL import Image
from torch import nn
from torch.nn import functional as F
from torch.utils import data
from torchvision import transforms


# Defined in file: ./chapter_preliminaries/calculus.md
[docs]def use_svg_display(): """Use the svg format to display a plot in Jupyter.""" display.set_matplotlib_formats('svg')
# Defined in file: ./chapter_preliminaries/calculus.md
[docs]def set_figsize(figsize=(3.5, 2.5)): """Set the figure size for matplotlib.""" use_svg_display() d2l.plt.rcParams['figure.figsize'] = figsize
# Defined in file: ./chapter_preliminaries/calculus.md
[docs]def set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend): """Set the axes for matplotlib.""" axes.set_xlabel(xlabel) axes.set_ylabel(ylabel) axes.set_xscale(xscale) axes.set_yscale(yscale) axes.set_xlim(xlim) axes.set_ylim(ylim) if legend: axes.legend(legend) axes.grid()
# Defined in file: ./chapter_preliminaries/calculus.md
[docs]def plot(X, Y=None, xlabel=None, ylabel=None, legend=None, xlim=None, ylim=None, xscale='linear', yscale='linear', fmts=('-', 'm--', 'g-.', 'r:'), figsize=(3.5, 2.5), axes=None): """Plot data points.""" if legend is None: legend = [] set_figsize(figsize) axes = axes if axes else d2l.plt.gca() # Return True if `X` (tensor or list) has 1 axis def has_one_axis(X): return (hasattr(X, "ndim") and X.ndim == 1 or isinstance(X, list) and not hasattr(X[0], "__len__")) if has_one_axis(X): X = [X] if Y is None: X, Y = [[]] * len(X), X elif has_one_axis(Y): Y = [Y] if len(X) != len(Y): X = X * len(Y) axes.cla() for x, y, fmt in zip(X, Y, fmts): if len(x): axes.plot(x, y, fmt) else: axes.plot(y, fmt) set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
# Defined in file: ./chapter_linear-networks/linear-regression.md
[docs]class Timer: """Record multiple running times.""" def __init__(self): self.times = [] self.start()
[docs] def start(self): """Start the timer.""" self.tik = time.time()
[docs] def stop(self): """Stop the timer and record the time in a list.""" self.times.append(time.time() - self.tik) return self.times[-1]
[docs] def avg(self): """Return the average time.""" return sum(self.times) / len(self.times)
[docs] def sum(self): """Return the sum of time.""" return sum(self.times)
[docs] def cumsum(self): """Return the accumulated time.""" return np.array(self.times).cumsum().tolist()
# Defined in file: ./chapter_linear-networks/linear-regression-scratch.md
[docs]def synthetic_data(w, b, num_examples): """Generate y = Xw + b + noise.""" X = d2l.normal(0, 1, (num_examples, len(w))) y = d2l.matmul(X, w) + b y += d2l.normal(0, 0.01, y.shape) return X, d2l.reshape(y, (-1, 1))
# Defined in file: ./chapter_linear-networks/linear-regression-scratch.md
[docs]def linreg(X, w, b): """The linear regression model.""" return d2l.matmul(X, w) + b
# Defined in file: ./chapter_linear-networks/linear-regression-scratch.md
[docs]def squared_loss(y_hat, y): """Squared loss.""" return (y_hat - d2l.reshape(y, y_hat.shape))**2 / 2
# Defined in file: ./chapter_linear-networks/linear-regression-scratch.md
[docs]def sgd(params, lr, batch_size): """Minibatch stochastic gradient descent.""" with torch.no_grad(): for param in params: param -= lr * param.grad / batch_size param.grad.zero_()
# Defined in file: ./chapter_linear-networks/linear-regression-concise.md
[docs]def load_array(data_arrays, batch_size, is_train=True): """Construct a PyTorch data iterator.""" dataset = data.TensorDataset(*data_arrays) return data.DataLoader(dataset, batch_size, shuffle=is_train)
# Defined in file: ./chapter_linear-networks/image-classification-dataset.md
[docs]def get_fashion_mnist_labels(labels): """Return text labels for the Fashion-MNIST dataset.""" text_labels = [ 't-shirt', 'trouser', 'pullover', 'dress', 'coat', 'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot'] return [text_labels[int(i)] for i in labels]
# Defined in file: ./chapter_linear-networks/image-classification-dataset.md
[docs]def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): """Plot a list of images.""" figsize = (num_cols * scale, num_rows * scale) _, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize) axes = axes.flatten() for i, (ax, img) in enumerate(zip(axes, imgs)): if torch.is_tensor(img): # Tensor Image ax.imshow(img.numpy()) else: # PIL Image ax.imshow(img) ax.axes.get_xaxis().set_visible(False) ax.axes.get_yaxis().set_visible(False) if titles: ax.set_title(titles[i]) return axes
# Defined in file: ./chapter_linear-networks/image-classification-dataset.md
[docs]def get_dataloader_workers(): """Use 4 processes to read the data.""" return 4
# Defined in file: ./chapter_linear-networks/image-classification-dataset.md
[docs]def load_data_fashion_mnist(batch_size, resize=None): """Download the Fashion-MNIST dataset and then load it into memory.""" trans = [transforms.ToTensor()] if resize: trans.insert(0, transforms.Resize(resize)) trans = transforms.Compose(trans) mnist_train = torchvision.datasets.FashionMNIST(root="../data", train=True, transform=trans, download=True) mnist_test = torchvision.datasets.FashionMNIST(root="../data", train=False, transform=trans, download=True) return (data.DataLoader(mnist_train, batch_size, shuffle=True, num_workers=get_dataloader_workers()), data.DataLoader(mnist_test, batch_size, shuffle=False, num_workers=get_dataloader_workers()))
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def accuracy(y_hat, y): """Compute the number of correct predictions.""" if len(y_hat.shape) > 1 and y_hat.shape[1] > 1: y_hat = d2l.argmax(y_hat, axis=1) cmp = d2l.astype(y_hat, y.dtype) == y return float(d2l.reduce_sum(d2l.astype(cmp, y.dtype)))
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def evaluate_accuracy(net, data_iter): """Compute the accuracy for a model on a dataset.""" if isinstance(net, torch.nn.Module): net.eval() # Set the model to evaluation mode metric = Accumulator(2) # No. of correct predictions, no. of predictions for X, y in data_iter: metric.add(accuracy(net(X), y), d2l.size(y)) return metric[0] / metric[1]
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]class Accumulator: """For accumulating sums over `n` variables.""" def __init__(self, n): self.data = [0.0] * n
[docs] def add(self, *args): self.data = [a + float(b) for a, b in zip(self.data, args)]
[docs] def reset(self): self.data = [0.0] * len(self.data)
def __getitem__(self, idx): return self.data[idx]
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def train_epoch_ch3(net, train_iter, loss, updater): """The training loop defined in Chapter 3.""" # Set the model to training mode if isinstance(net, torch.nn.Module): net.train() # Sum of training loss, sum of training accuracy, no. of examples metric = Accumulator(3) for X, y in train_iter: # Compute gradients and update parameters y_hat = net(X) l = loss(y_hat, y) if isinstance(updater, torch.optim.Optimizer): # Using PyTorch in-built optimizer & loss criterion updater.zero_grad() l.backward() updater.step() metric.add( float(l) * len(y), accuracy(y_hat, y), y.size().numel()) else: # Using custom built optimizer & loss criterion l.sum().backward() updater(X.shape[0]) metric.add(float(l.sum()), accuracy(y_hat, y), y.numel()) # Return training loss and training accuracy return metric[0] / metric[2], metric[1] / metric[2]
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]class Animator: """For plotting data in animation.""" def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None, ylim=None, xscale='linear', yscale='linear', fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1, figsize=(3.5, 2.5)): # Incrementally plot multiple lines if legend is None: legend = [] d2l.use_svg_display() self.fig, self.axes = d2l.plt.subplots(nrows, ncols, figsize=figsize) if nrows * ncols == 1: self.axes = [self.axes,] # Use a lambda function to capture arguments self.config_axes = lambda: d2l.set_axes(self.axes[ 0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend) self.X, self.Y, self.fmts = None, None, fmts
[docs] def add(self, x, y): # Add multiple data points into the figure if not hasattr(y, "__len__"): y = [y] n = len(y) if not hasattr(x, "__len__"): x = [x] * n if not self.X: self.X = [[] for _ in range(n)] if not self.Y: self.Y = [[] for _ in range(n)] for i, (a, b) in enumerate(zip(x, y)): if a is not None and b is not None: self.X[i].append(a) self.Y[i].append(b) self.axes[0].cla() for x, y, fmt in zip(self.X, self.Y, self.fmts): self.axes[0].plot(x, y, fmt) self.config_axes() display.display(self.fig) display.clear_output(wait=True)
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def train_ch3(net, train_iter, test_iter, loss, num_epochs, updater): """Train a model (defined in Chapter 3).""" animator = Animator(xlabel='epoch', xlim=[1, num_epochs], ylim=[0.3, 0.9], legend=['train loss', 'train acc', 'test acc']) for epoch in range(num_epochs): train_metrics = train_epoch_ch3(net, train_iter, loss, updater) test_acc = evaluate_accuracy(net, test_iter) animator.add(epoch + 1, train_metrics + (test_acc,)) train_loss, train_acc = train_metrics assert train_loss < 0.5, train_loss assert train_acc <= 1 and train_acc > 0.7, train_acc assert test_acc <= 1 and test_acc > 0.7, test_acc
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def predict_ch3(net, test_iter, n=6): """Predict labels (defined in Chapter 3).""" for X, y in test_iter: break trues = d2l.get_fashion_mnist_labels(y) preds = d2l.get_fashion_mnist_labels(d2l.argmax(net(X), axis=1)) titles = [true + '\n' + pred for true, pred in zip(trues, preds)] d2l.show_images(d2l.reshape(X[0:n], (n, 28, 28)), 1, n, titles=titles[0:n])
# Defined in file: ./chapter_multilayer-perceptrons/underfit-overfit.md
[docs]def evaluate_loss(net, data_iter, loss): """Evaluate the loss of a model on the given dataset.""" metric = d2l.Accumulator(2) # Sum of losses, no. of examples for X, y in data_iter: out = net(X) y = d2l.reshape(y, out.shape) l = loss(out, y) metric.add(d2l.reduce_sum(l), d2l.size(l)) return metric[0] / metric[1]
# Defined in file: ./chapter_multilayer-perceptrons/kaggle-house-price.md DATA_HUB = dict() DATA_URL = 'http://d2l-data.s3-accelerate.amazonaws.com/' # Defined in file: ./chapter_multilayer-perceptrons/kaggle-house-price.md
[docs]def download(name, cache_dir=os.path.join('..', 'data')): """Download a file inserted into DATA_HUB, return the local filename.""" assert name in DATA_HUB, f"{name} does not exist in {DATA_HUB}." url, sha1_hash = DATA_HUB[name] os.makedirs(cache_dir, exist_ok=True) fname = os.path.join(cache_dir, url.split('/')[-1]) if os.path.exists(fname): sha1 = hashlib.sha1() with open(fname, 'rb') as f: while True: data = f.read(1048576) if not data: break sha1.update(data) if sha1.hexdigest() == sha1_hash: return fname # Hit cache print(f'Downloading {fname} from {url}...') r = requests.get(url, stream=True, verify=True) with open(fname, 'wb') as f: f.write(r.content) return fname
# Defined in file: ./chapter_multilayer-perceptrons/kaggle-house-price.md
[docs]def download_extract(name, folder=None): """Download and extract a zip/tar file.""" fname = download(name) base_dir = os.path.dirname(fname) data_dir, ext = os.path.splitext(fname) if ext == '.zip': fp = zipfile.ZipFile(fname, 'r') elif ext in ('.tar', '.gz'): fp = tarfile.open(fname, 'r') else: assert False, 'Only zip/tar files can be extracted.' fp.extractall(base_dir) return os.path.join(base_dir, folder) if folder else data_dir
[docs]def download_all(): """Download all files in the DATA_HUB.""" for name in DATA_HUB: download(name)
# Defined in file: ./chapter_multilayer-perceptrons/kaggle-house-price.md DATA_HUB['kaggle_house_train'] = (DATA_URL + 'kaggle_house_pred_train.csv', '585e9cc93e70b39160e7921475f9bcd7d31219ce') DATA_HUB['kaggle_house_test'] = (DATA_URL + 'kaggle_house_pred_test.csv', 'fa19780a7b011d9b009e8bff8e99922a8ee2eb90') # Defined in file: ./chapter_deep-learning-computation/use-gpu.md
[docs]def try_gpu(i=0): """Return gpu(i) if exists, otherwise return cpu().""" if torch.cuda.device_count() >= i + 1: return torch.device(f'cuda:{i}') return torch.device('cpu')
[docs]def try_all_gpus(): """Return all available GPUs, or [cpu(),] if no GPU exists.""" devices = [ torch.device(f'cuda:{i}') for i in range(torch.cuda.device_count())] return devices if devices else [torch.device('cpu')]
# Defined in file: ./chapter_convolutional-neural-networks/conv-layer.md
[docs]def corr2d(X, K): """Compute 2D cross-correlation.""" h, w = K.shape Y = d2l.zeros((X.shape[0] - h + 1, X.shape[1] - w + 1)) for i in range(Y.shape[0]): for j in range(Y.shape[1]): Y[i, j] = d2l.reduce_sum((X[i:i + h, j:j + w] * K)) return Y
# Defined in file: ./chapter_convolutional-neural-networks/lenet.md
[docs]def evaluate_accuracy_gpu(net, data_iter, device=None): """Compute the accuracy for a model on a dataset using a GPU.""" if isinstance(net, torch.nn.Module): net.eval() # Set the model to evaluation mode if not device: device = next(iter(net.parameters())).device # No. of correct predictions, no. of predictions metric = d2l.Accumulator(2) for X, y in data_iter: if isinstance(X, list): # Required for BERT Fine-tuning (to be covered later) X = [x.to(device) for x in X] else: X = X.to(device) y = y.to(device) metric.add(d2l.accuracy(net(X), y), d2l.size(y)) return metric[0] / metric[1]
# Defined in file: ./chapter_convolutional-neural-networks/lenet.md
[docs]def train_ch6(net, train_iter, test_iter, num_epochs, lr, device): """Train a model with a GPU (defined in Chapter 6).""" def init_weights(m): if type(m) == nn.Linear or type(m) == nn.Conv2d: nn.init.xavier_uniform_(m.weight) net.apply(init_weights) print('training on', device) net.to(device) optimizer = torch.optim.SGD(net.parameters(), lr=lr) loss = nn.CrossEntropyLoss() animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs], legend=['train loss', 'train acc', 'test acc']) timer, num_batches = d2l.Timer(), len(train_iter) for epoch in range(num_epochs): # Sum of training loss, sum of training accuracy, no. of examples metric = d2l.Accumulator(3) net.train() for i, (X, y) in enumerate(train_iter): timer.start() optimizer.zero_grad() X, y = X.to(device), y.to(device) y_hat = net(X) l = loss(y_hat, y) l.backward() optimizer.step() with torch.no_grad(): metric.add(l * X.shape[0], d2l.accuracy(y_hat, y), X.shape[0]) timer.stop() train_l = metric[0] / metric[2] train_acc = metric[1] / metric[2] if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1: animator.add(epoch + (i + 1) / num_batches, (train_l, train_acc, None)) test_acc = evaluate_accuracy_gpu(net, test_iter) animator.add(epoch + 1, (None, None, test_acc)) print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, ' f'test acc {test_acc:.3f}') print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec ' f'on {str(device)}')
# Defined in file: ./chapter_convolutional-modern/resnet.md
[docs]class Residual(nn.Module): """The Residual block of ResNet.""" def __init__(self, input_channels, num_channels, use_1x1conv=False, strides=1): super().__init__() self.conv1 = nn.Conv2d(input_channels, num_channels, kernel_size=3, padding=1, stride=strides) self.conv2 = nn.Conv2d(num_channels, num_channels, kernel_size=3, padding=1) if use_1x1conv: self.conv3 = nn.Conv2d(input_channels, num_channels, kernel_size=1, stride=strides) else: self.conv3 = None self.bn1 = nn.BatchNorm2d(num_channels) self.bn2 = nn.BatchNorm2d(num_channels) self.relu = nn.ReLU(inplace=True)
[docs] def forward(self, X): Y = F.relu(self.bn1(self.conv1(X))) Y = self.bn2(self.conv2(Y)) if self.conv3: X = self.conv3(X) Y += X return F.relu(Y)
# Defined in file: ./chapter_recurrent-neural-networks/text-preprocessing.md d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt', '090b5e7e70c295757f55df93cb0a180b9691891a')
[docs]def read_time_machine(): """Load the time machine dataset into a list of text lines.""" with open(d2l.download('time_machine'), 'r') as f: lines = f.readlines() return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
# Defined in file: ./chapter_recurrent-neural-networks/text-preprocessing.md
[docs]def tokenize(lines, token='word'): """Split text lines into word or character tokens.""" if token == 'word': return [line.split() for line in lines] elif token == 'char': return [list(line) for line in lines] else: print('ERROR: unknown token type: ' + token)
# Defined in file: ./chapter_recurrent-neural-networks/text-preprocessing.md
[docs]class Vocab: """Vocabulary for text.""" def __init__(self, tokens=None, min_freq=0, reserved_tokens=None): if tokens is None: tokens = [] if reserved_tokens is None: reserved_tokens = [] # Sort according to frequencies counter = count_corpus(tokens) self.token_freqs = sorted(counter.items(), key=lambda x: x[1], reverse=True) # The index for the unknown token is 0 self.unk, uniq_tokens = 0, ['<unk>'] + reserved_tokens uniq_tokens += [ token for token, freq in self.token_freqs if freq >= min_freq and token not in uniq_tokens] self.idx_to_token, self.token_to_idx = [], dict() for token in uniq_tokens: self.idx_to_token.append(token) self.token_to_idx[token] = len(self.idx_to_token) - 1 def __len__(self): return len(self.idx_to_token) def __getitem__(self, tokens): if not isinstance(tokens, (list, tuple)): return self.token_to_idx.get(tokens, self.unk) return [self.__getitem__(token) for token in tokens]
[docs] def to_tokens(self, indices): if not isinstance(indices, (list, tuple)): return self.idx_to_token[indices] return [self.idx_to_token[index] for index in indices]
[docs]def count_corpus(tokens): """Count token frequencies.""" # Here `tokens` is a 1D list or 2D list if len(tokens) == 0 or isinstance(tokens[0], list): # Flatten a list of token lists into a list of tokens tokens = [token for line in tokens for token in line] return collections.Counter(tokens)
# Defined in file: ./chapter_recurrent-neural-networks/text-preprocessing.md
[docs]def load_corpus_time_machine(max_tokens=-1): """Return token indices and the vocabulary of the time machine dataset.""" lines = read_time_machine() tokens = tokenize(lines, 'char') vocab = Vocab(tokens) # Since each text line in the time machine dataset is not necessarily a # sentence or a paragraph, flatten all the text lines into a single list corpus = [vocab[token] for line in tokens for token in line] if max_tokens > 0: corpus = corpus[:max_tokens] return corpus, vocab
# Defined in file: ./chapter_recurrent-neural-networks/language-models-and-dataset.md
[docs]def seq_data_iter_random(corpus, batch_size, num_steps): """Generate a minibatch of subsequences using random sampling.""" # Start with a random offset (inclusive of `num_steps - 1`) to partition a # sequence corpus = corpus[random.randint(0, num_steps - 1):] # Subtract 1 since we need to account for labels num_subseqs = (len(corpus) - 1) // num_steps # The starting indices for subsequences of length `num_steps` initial_indices = list(range(0, num_subseqs * num_steps, num_steps)) # In random sampling, the subsequences from two adjacent random # minibatches during iteration are not necessarily adjacent on the # original sequence random.shuffle(initial_indices) def data(pos): # Return a sequence of length `num_steps` starting from `pos` return corpus[pos:pos + num_steps] num_batches = num_subseqs // batch_size for i in range(0, batch_size * num_batches, batch_size): # Here, `initial_indices` contains randomized starting indices for # subsequences initial_indices_per_batch = initial_indices[i:i + batch_size] X = [data(j) for j in initial_indices_per_batch] Y = [data(j + 1) for j in initial_indices_per_batch] yield d2l.tensor(X), d2l.tensor(Y)
# Defined in file: ./chapter_recurrent-neural-networks/language-models-and-dataset.md
[docs]def seq_data_iter_sequential(corpus, batch_size, num_steps): """Generate a minibatch of subsequences using sequential partitioning.""" # Start with a random offset to partition a sequence offset = random.randint(0, num_steps) num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size Xs = d2l.tensor(corpus[offset:offset + num_tokens]) Ys = d2l.tensor(corpus[offset + 1:offset + 1 + num_tokens]) Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1) num_batches = Xs.shape[1] // num_steps for i in range(0, num_steps * num_batches, num_steps): X = Xs[:, i:i + num_steps] Y = Ys[:, i:i + num_steps] yield X, Y
# Defined in file: ./chapter_recurrent-neural-networks/language-models-and-dataset.md
[docs]class SeqDataLoader: """An iterator to load sequence data.""" def __init__(self, batch_size, num_steps, use_random_iter, max_tokens): if use_random_iter: self.data_iter_fn = d2l.seq_data_iter_random else: self.data_iter_fn = d2l.seq_data_iter_sequential self.corpus, self.vocab = d2l.load_corpus_time_machine(max_tokens) self.batch_size, self.num_steps = batch_size, num_steps def __iter__(self): return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)
# Defined in file: ./chapter_recurrent-neural-networks/language-models-and-dataset.md
[docs]def load_data_time_machine(batch_size, num_steps, use_random_iter=False, max_tokens=10000): """Return the iterator and the vocabulary of the time machine dataset.""" data_iter = SeqDataLoader(batch_size, num_steps, use_random_iter, max_tokens) return data_iter, data_iter.vocab
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]class RNNModelScratch: """A RNN Model implemented from scratch.""" def __init__(self, vocab_size, num_hiddens, device, get_params, init_state, forward_fn): self.vocab_size, self.num_hiddens = vocab_size, num_hiddens self.params = get_params(vocab_size, num_hiddens, device) self.init_state, self.forward_fn = init_state, forward_fn def __call__(self, X, state): X = F.one_hot(X.T, self.vocab_size).type(torch.float32) return self.forward_fn(X, state, self.params)
[docs] def begin_state(self, batch_size, device): return self.init_state(batch_size, self.num_hiddens, device)
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]def predict_ch8(prefix, num_preds, net, vocab, device): """Generate new characters following the `prefix`.""" state = net.begin_state(batch_size=1, device=device) outputs = [vocab[prefix[0]]] get_input = lambda: d2l.reshape(d2l.tensor([outputs[-1]], device=device), (1, 1)) for y in prefix[1:]: # Warm-up period _, state = net(get_input(), state) outputs.append(vocab[y]) for _ in range(num_preds): # Predict `num_preds` steps y, state = net(get_input(), state) outputs.append(int(y.argmax(dim=1).reshape(1))) return ''.join([vocab.idx_to_token[i] for i in outputs])
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]def grad_clipping(net, theta): """Clip the gradient.""" if isinstance(net, nn.Module): params = [p for p in net.parameters() if p.requires_grad] else: params = net.params norm = torch.sqrt(sum(torch.sum((p.grad**2)) for p in params)) if norm > theta: for param in params: param.grad[:] *= theta / norm
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter): """Train a net within one epoch (defined in Chapter 8).""" state, timer = None, d2l.Timer() metric = d2l.Accumulator(2) # Sum of training loss, no. of tokens for X, Y in train_iter: if state is None or use_random_iter: # Initialize `state` when either it is the first iteration or # using random sampling state = net.begin_state(batch_size=X.shape[0], device=device) else: if isinstance(net, nn.Module) and not isinstance(state, tuple): # `state` is a tensor for `nn.GRU` state.detach_() else: # `state` is a tuple of tensors for `nn.LSTM` and # for our custom scratch implementation for s in state: s.detach_() y = Y.T.reshape(-1) X, y = X.to(device), y.to(device) y_hat, state = net(X, state) l = loss(y_hat, y.long()).mean() if isinstance(updater, torch.optim.Optimizer): updater.zero_grad() l.backward() grad_clipping(net, 1) updater.step() else: l.backward() grad_clipping(net, 1) # Since the `mean` function has been invoked updater(batch_size=1) metric.add(l * d2l.size(y), d2l.size(y)) return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]def train_ch8(net, train_iter, vocab, lr, num_epochs, device, use_random_iter=False): """Train a model (defined in Chapter 8).""" loss = nn.CrossEntropyLoss() animator = d2l.Animator(xlabel='epoch', ylabel='perplexity', legend=['train'], xlim=[10, num_epochs]) # Initialize if isinstance(net, nn.Module): updater = torch.optim.SGD(net.parameters(), lr) else: updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size) predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device) # Train and predict for epoch in range(num_epochs): ppl, speed = train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter) if (epoch + 1) % 10 == 0: print(predict('time traveller')) animator.add(epoch + 1, [ppl]) print(f'perplexity {ppl:.1f}, {speed:.1f} tokens/sec on {str(device)}') print(predict('time traveller')) print(predict('traveller'))
# Defined in file: ./chapter_recurrent-neural-networks/rnn-concise.md
[docs]class RNNModel(nn.Module): """The RNN model.""" def __init__(self, rnn_layer, vocab_size, **kwargs): super(RNNModel, self).__init__(**kwargs) self.rnn = rnn_layer self.vocab_size = vocab_size self.num_hiddens = self.rnn.hidden_size # If the RNN is bidirectional (to be introduced later), # `num_directions` should be 2, else it should be 1. if not self.rnn.bidirectional: self.num_directions = 1 self.linear = nn.Linear(self.num_hiddens, self.vocab_size) else: self.num_directions = 2 self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size)
[docs] def forward(self, inputs, state): X = F.one_hot(inputs.T.long(), self.vocab_size) X = X.to(torch.float32) Y, state = self.rnn(X, state) # The fully connected layer will first change the shape of `Y` to # (`num_steps` * `batch_size`, `num_hiddens`). Its output shape is # (`num_steps` * `batch_size`, `vocab_size`). output = self.linear(Y.reshape((-1, Y.shape[-1]))) return output, state
[docs] def begin_state(self, device, batch_size=1): if not isinstance(self.rnn, nn.LSTM): # `nn.GRU` takes a tensor as hidden state return torch.zeros((self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device) else: # `nn.LSTM` takes a tuple of hidden states return (torch.zeros((self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device), torch.zeros((self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device))
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip', '94646ad1522d915e7b0f9296181140edcf86a4f5')
[docs]def read_data_nmt(): """Load the English-French dataset.""" data_dir = d2l.download_extract('fra-eng') with open(os.path.join(data_dir, 'fra.txt'), 'r') as f: return f.read()
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def preprocess_nmt(text): """Preprocess the English-French dataset.""" def no_space(char, prev_char): return char in set(',.!?') and prev_char != ' ' # Replace non-breaking space with space, and convert uppercase letters to # lowercase ones text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower() # Insert space between words and punctuation marks out = [ ' ' + char if i > 0 and no_space(char, text[i - 1]) else char for i, char in enumerate(text)] return ''.join(out)
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def tokenize_nmt(text, num_examples=None): """Tokenize the English-French dataset.""" source, target = [], [] for i, line in enumerate(text.split('\n')): if num_examples and i > num_examples: break parts = line.split('\t') if len(parts) == 2: source.append(parts[0].split(' ')) target.append(parts[1].split(' ')) return source, target
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def truncate_pad(line, num_steps, padding_token): """Truncate or pad sequences.""" if len(line) > num_steps: return line[:num_steps] # Truncate return line + [padding_token] * (num_steps - len(line)) # Pad
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def build_array_nmt(lines, vocab, num_steps): """Transform text sequences of machine translation into minibatches.""" lines = [vocab[l] for l in lines] lines = [l + [vocab['<eos>']] for l in lines] array = d2l.tensor([ truncate_pad(l, num_steps, vocab['<pad>']) for l in lines]) valid_len = d2l.reduce_sum(d2l.astype(array != vocab['<pad>'], d2l.int32), 1) return array, valid_len
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def load_data_nmt(batch_size, num_steps, num_examples=600): """Return the iterator and the vocabularies of the translation dataset.""" text = preprocess_nmt(read_data_nmt()) source, target = tokenize_nmt(text, num_examples) src_vocab = d2l.Vocab(source, min_freq=2, reserved_tokens=['<pad>', '<bos>', '<eos>']) tgt_vocab = d2l.Vocab(target, min_freq=2, reserved_tokens=['<pad>', '<bos>', '<eos>']) src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps) tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps) data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len) data_iter = d2l.load_array(data_arrays, batch_size) return data_iter, src_vocab, tgt_vocab
# Defined in file: ./chapter_recurrent-modern/encoder-decoder.md
[docs]class Encoder(nn.Module): """The base encoder interface for the encoder-decoder architecture.""" def __init__(self, **kwargs): super(Encoder, self).__init__(**kwargs)
[docs] def forward(self, X, *args): raise NotImplementedError
# Defined in file: ./chapter_recurrent-modern/encoder-decoder.md
[docs]class Decoder(nn.Module): """The base decoder interface for the encoder-decoder architecture.""" def __init__(self, **kwargs): super(Decoder, self).__init__(**kwargs)
[docs] def init_state(self, enc_outputs, *args): raise NotImplementedError
[docs] def forward(self, X, state): raise NotImplementedError
# Defined in file: ./chapter_recurrent-modern/encoder-decoder.md
[docs]class EncoderDecoder(nn.Module): """The base class for the encoder-decoder architecture.""" def __init__(self, encoder, decoder, **kwargs): super(EncoderDecoder, self).__init__(**kwargs) self.encoder = encoder self.decoder = decoder
[docs] def forward(self, enc_X, dec_X, *args): enc_outputs = self.encoder(enc_X, *args) dec_state = self.decoder.init_state(enc_outputs, *args) return self.decoder(dec_X, dec_state)
# Defined in file: ./chapter_recurrent-modern/seq2seq.md
[docs]class Seq2SeqEncoder(d2l.Encoder): """The RNN encoder for sequence to sequence learning.""" def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs): super(Seq2SeqEncoder, self).__init__(**kwargs) # Embedding layer self.embedding = nn.Embedding(vocab_size, embed_size) self.rnn = nn.GRU(embed_size, num_hiddens, num_layers, dropout=dropout)
[docs] def forward(self, X, *args): # The output `X` shape: (`batch_size`, `num_steps`, `embed_size`) X = self.embedding(X) # In RNN models, the first axis corresponds to time steps X = X.permute(1, 0, 2) # When state is not mentioned, it defaults to zeros output, state = self.rnn(X) # `output` shape: (`num_steps`, `batch_size`, `num_hiddens`) # `state` shape: (`num_layers`, `batch_size`, `num_hiddens`) return output, state
# Defined in file: ./chapter_recurrent-modern/seq2seq.md
[docs]def sequence_mask(X, valid_len, value=0): """Mask irrelevant entries in sequences.""" maxlen = X.size(1) mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None] X[~mask] = value return X
# Defined in file: ./chapter_recurrent-modern/seq2seq.md
[docs]class MaskedSoftmaxCELoss(nn.CrossEntropyLoss): """The softmax cross-entropy loss with masks.""" # `pred` shape: (`batch_size`, `num_steps`, `vocab_size`) # `label` shape: (`batch_size`, `num_steps`) # `valid_len` shape: (`batch_size`,)
[docs] def forward(self, pred, label, valid_len): weights = torch.ones_like(label) weights = sequence_mask(weights, valid_len) self.reduction = 'none' unweighted_loss = super(MaskedSoftmaxCELoss, self).forward(pred.permute(0, 2, 1), label) weighted_loss = (unweighted_loss * weights).mean(dim=1) return weighted_loss
# Defined in file: ./chapter_recurrent-modern/seq2seq.md
[docs]def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device): """Train a model for sequence to sequence.""" def xavier_init_weights(m): if type(m) == nn.Linear: nn.init.xavier_uniform_(m.weight) if type(m) == nn.GRU: for param in m._flat_weights_names: if "weight" in param: nn.init.xavier_uniform_(m._parameters[param]) net.apply(xavier_init_weights) net.to(device) optimizer = torch.optim.Adam(net.parameters(), lr=lr) loss = MaskedSoftmaxCELoss() net.train() animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[10, num_epochs]) for epoch in range(num_epochs): timer = d2l.Timer() metric = d2l.Accumulator(2) # Sum of training loss, no. of tokens for batch in data_iter: X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch] bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0], device=device).reshape(-1, 1) dec_input = d2l.concat([bos, Y[:, :-1]], 1) # Teacher forcing Y_hat, _ = net(X, dec_input, X_valid_len) l = loss(Y_hat, Y, Y_valid_len) l.sum().backward() # Make the loss scalar for `backward` d2l.grad_clipping(net, 1) num_tokens = Y_valid_len.sum() optimizer.step() with torch.no_grad(): metric.add(l.sum(), num_tokens) if (epoch + 1) % 10 == 0: animator.add(epoch + 1, (metric[0] / metric[1],)) print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} ' f'tokens/sec on {str(device)}')
# Defined in file: ./chapter_recurrent-modern/seq2seq.md
[docs]def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps, device, save_attention_weights=False): """Predict for sequence to sequence.""" # Set `net` to eval mode for inference net.eval() src_tokens = src_vocab[src_sentence.lower().split(' ')] + [ src_vocab['<eos>']] enc_valid_len = torch.tensor([len(src_tokens)], device=device) src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>']) # Add the batch axis enc_X = torch.unsqueeze( torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0) enc_outputs = net.encoder(enc_X, enc_valid_len) dec_state = net.decoder.init_state(enc_outputs, enc_valid_len) # Add the batch axis dec_X = torch.unsqueeze( torch.tensor([tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0) output_seq, attention_weight_seq = [], [] for _ in range(num_steps): Y, dec_state = net.decoder(dec_X, dec_state) # We use the token with the highest prediction likelihood as the input # of the decoder at the next time step dec_X = Y.argmax(dim=2) pred = dec_X.squeeze(dim=0).type(torch.int32).item() # Save attention weights (to be covered later) if save_attention_weights: attention_weight_seq.append(net.decoder.attention_weights) # Once the end-of-sequence token is predicted, the generation of the # output sequence is complete if pred == tgt_vocab['<eos>']: break output_seq.append(pred) return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq
# Defined in file: ./chapter_recurrent-modern/seq2seq.md
[docs]def bleu(pred_seq, label_seq, k): """Compute the BLEU.""" pred_tokens, label_tokens = pred_seq.split(' '), label_seq.split(' ') len_pred, len_label = len(pred_tokens), len(label_tokens) score = math.exp(min(0, 1 - len_label / len_pred)) for n in range(1, k + 1): num_matches, label_subs = 0, collections.defaultdict(int) for i in range(len_label - n + 1): label_subs[''.join(label_tokens[i:i + n])] += 1 for i in range(len_pred - n + 1): if label_subs[''.join(pred_tokens[i:i + n])] > 0: num_matches += 1 label_subs[''.join(pred_tokens[i:i + n])] -= 1 score *= math.pow(num_matches / (len_pred - n + 1), math.pow(0.5, n)) return score
# Defined in file: ./chapter_attention-mechanisms/attention-cues.md
[docs]def show_heatmaps(matrices, xlabel, ylabel, titles=None, figsize=(2.5, 2.5), cmap='Reds'): d2l.use_svg_display() num_rows, num_cols = matrices.shape[0], matrices.shape[1] fig, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize, sharex=True, sharey=True, squeeze=False) for i, (row_axes, row_matrices) in enumerate(zip(axes, matrices)): for j, (ax, matrix) in enumerate(zip(row_axes, row_matrices)): pcm = ax.imshow(d2l.numpy(matrix), cmap=cmap) if i == num_rows - 1: ax.set_xlabel(xlabel) if j == 0: ax.set_ylabel(ylabel) if titles: ax.set_title(titles[j]) fig.colorbar(pcm, ax=axes, shrink=0.6)
# Defined in file: ./chapter_attention-mechanisms/attention-scoring-functions.md
[docs]def masked_softmax(X, valid_lens): """Perform softmax operation by masking elements on the last axis.""" # `X`: 3D tensor, `valid_lens`: 1D or 2D tensor if valid_lens is None: return nn.functional.softmax(X, dim=-1) else: shape = X.shape if valid_lens.dim() == 1: valid_lens = torch.repeat_interleave(valid_lens, shape[1]) else: valid_lens = valid_lens.reshape(-1) # On the last axis, replace masked elements with a very large negative # value, whose exponentiation outputs 0 X = d2l.sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6) return nn.functional.softmax(X.reshape(shape), dim=-1)
# Defined in file: ./chapter_attention-mechanisms/attention-scoring-functions.md
[docs]class AdditiveAttention(nn.Module): def __init__(self, key_size, query_size, num_hiddens, dropout, **kwargs): super(AdditiveAttention, self).__init__(**kwargs) self.W_k = nn.Linear(key_size, num_hiddens, bias=False) self.W_q = nn.Linear(query_size, num_hiddens, bias=False) self.w_v = nn.Linear(num_hiddens, 1, bias=False) self.dropout = nn.Dropout(dropout)
[docs] def forward(self, queries, keys, values, valid_lens): queries, keys = self.W_q(queries), self.W_k(keys) # After dimension expansion, shape of `queries`: (`batch_size`, no. of # queries, 1, `num_hiddens`) and shape of `keys`: (`batch_size`, 1, # no. of key-value pairs, `num_hiddens`). Sum them up with # broadcasting features = queries.unsqueeze(2) + keys.unsqueeze(1) features = torch.tanh(features) # There is only one output of `self.w_v`, so we remove the last # one-dimensional entry from the shape. Shape of `scores`: # (`batch_size`, no. of queries, no. of key-value pairs) scores = self.w_v(features).squeeze(-1) self.attention_weights = masked_softmax(scores, valid_lens) # Shape of `values`: (`batch_size`, no. of key-value pairs, value # dimension) return torch.bmm(self.dropout(self.attention_weights), values)
# Defined in file: ./chapter_attention-mechanisms/attention-scoring-functions.md
[docs]class DotProductAttention(nn.Module): """Scaled dot product attention.""" def __init__(self, dropout, **kwargs): super(DotProductAttention, self).__init__(**kwargs) self.dropout = nn.Dropout(dropout) # Shape of `queries`: (`batch_size`, no. of queries, `d`) # Shape of `keys`: (`batch_size`, no. of key-value pairs, `d`) # Shape of `values`: (`batch_size`, no. of key-value pairs, value # dimension) # Shape of `valid_lens`: (`batch_size`,) or (`batch_size`, no. of queries)
[docs] def forward(self, queries, keys, values, valid_lens=None): d = queries.shape[-1] # Set `transpose_b=True` to swap the last two dimensions of `keys` scores = torch.bmm(queries, keys.transpose(1, 2)) / math.sqrt(d) self.attention_weights = masked_softmax(scores, valid_lens) return torch.bmm(self.dropout(self.attention_weights), values)
# Defined in file: ./chapter_attention-mechanisms/bahdanau-attention.md
[docs]class AttentionDecoder(d2l.Decoder): """The base attention-based decoder interface.""" def __init__(self, **kwargs): super(AttentionDecoder, self).__init__(**kwargs) @property def attention_weights(self): raise NotImplementedError
# Defined in file: ./chapter_attention-mechanisms/multihead-attention.md
[docs]class MultiHeadAttention(nn.Module): def __init__(self, key_size, query_size, value_size, num_hiddens, num_heads, dropout, bias=False, **kwargs): super(MultiHeadAttention, self).__init__(**kwargs) self.num_heads = num_heads self.attention = d2l.DotProductAttention(dropout) self.W_q = nn.Linear(query_size, num_hiddens, bias=bias) self.W_k = nn.Linear(key_size, num_hiddens, bias=bias) self.W_v = nn.Linear(value_size, num_hiddens, bias=bias) self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias)
[docs] def forward(self, queries, keys, values, valid_lens): # Shape of `queries`, `keys`, or `values`: # (`batch_size`, no. of queries or key-value pairs, `num_hiddens`) # Shape of `valid_lens`: # (`batch_size`,) or (`batch_size`, no. of queries) # After transposing, shape of output `queries`, `keys`, or `values`: # (`batch_size` * `num_heads`, no. of queries or key-value pairs, # `num_hiddens` / `num_heads`) queries = transpose_qkv(self.W_q(queries), self.num_heads) keys = transpose_qkv(self.W_k(keys), self.num_heads) values = transpose_qkv(self.W_v(values), self.num_heads) if valid_lens is not None: # On axis 0, copy the first item (scalar or vector) for # `num_heads` times, then copy the next item, and so on valid_lens = torch.repeat_interleave(valid_lens, repeats=self.num_heads, dim=0) # Shape of `output`: (`batch_size` * `num_heads`, no. of queries, # `num_hiddens` / `num_heads`) output = self.attention(queries, keys, values, valid_lens) # Shape of `output_concat`: # (`batch_size`, no. of queries, `num_hiddens`) output_concat = transpose_output(output, self.num_heads) return self.W_o(output_concat)
# Defined in file: ./chapter_attention-mechanisms/multihead-attention.md
[docs]def transpose_qkv(X, num_heads): # Shape of input `X`: # (`batch_size`, no. of queries or key-value pairs, `num_hiddens`). # Shape of output `X`: # (`batch_size`, no. of queries or key-value pairs, `num_heads`, # `num_hiddens` / `num_heads`) X = X.reshape(X.shape[0], X.shape[1], num_heads, -1) # Shape of output `X`: # (`batch_size`, `num_heads`, no. of queries or key-value pairs, # `num_hiddens` / `num_heads`) X = X.permute(0, 2, 1, 3) # Shape of `output`: # (`batch_size` * `num_heads`, no. of queries or key-value pairs, # `num_hiddens` / `num_heads`) return X.reshape(-1, X.shape[2], X.shape[3])
[docs]def transpose_output(X, num_heads): """Reverse the operation of `transpose_qkv`""" X = X.reshape(-1, num_heads, X.shape[1], X.shape[2]) X = X.permute(0, 2, 1, 3) return X.reshape(X.shape[0], X.shape[1], -1)
# Defined in file: ./chapter_attention-mechanisms/self-attention-and-positional-encoding.md
[docs]class PositionalEncoding(nn.Module): def __init__(self, num_hiddens, dropout, max_len=1000): super(PositionalEncoding, self).__init__() self.dropout = nn.Dropout(dropout) # Create a long enough `P` self.P = d2l.zeros((1, max_len, num_hiddens)) X = d2l.arange(max_len, dtype=torch.float32).reshape( -1, 1) / torch.pow( 10000, torch.arange(0, num_hiddens, 2, dtype=torch.float32) / num_hiddens) self.P[:, :, 0::2] = torch.sin(X) self.P[:, :, 1::2] = torch.cos(X)
[docs] def forward(self, X): X = X + self.P[:, :X.shape[1], :].to(X.device) return self.dropout(X)
# Defined in file: ./chapter_attention-mechanisms/transformer.md
[docs]class PositionWiseFFN(nn.Module): def __init__(self, ffn_num_input, ffn_num_hiddens, ffn_num_outputs, **kwargs): super(PositionWiseFFN, self).__init__(**kwargs) self.dense1 = nn.Linear(ffn_num_input, ffn_num_hiddens) self.relu = nn.ReLU() self.dense2 = nn.Linear(ffn_num_hiddens, ffn_num_outputs)
[docs] def forward(self, X): return self.dense2(self.relu(self.dense1(X)))
# Defined in file: ./chapter_attention-mechanisms/transformer.md
[docs]class AddNorm(nn.Module): def __init__(self, normalized_shape, dropout, **kwargs): super(AddNorm, self).__init__(**kwargs) self.dropout = nn.Dropout(dropout) self.ln = nn.LayerNorm(normalized_shape)
[docs] def forward(self, X, Y): return self.ln(self.dropout(Y) + X)
# Defined in file: ./chapter_attention-mechanisms/transformer.md
[docs]class EncoderBlock(nn.Module): def __init__(self, key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, dropout, use_bias=False, **kwargs): super(EncoderBlock, self).__init__(**kwargs) self.attention = d2l.MultiHeadAttention(key_size, query_size, value_size, num_hiddens, num_heads, dropout, use_bias) self.addnorm1 = AddNorm(norm_shape, dropout) self.ffn = PositionWiseFFN(ffn_num_input, ffn_num_hiddens, num_hiddens) self.addnorm2 = AddNorm(norm_shape, dropout)
[docs] def forward(self, X, valid_lens): Y = self.addnorm1(X, self.attention(X, X, X, valid_lens)) return self.addnorm2(Y, self.ffn(Y))
# Defined in file: ./chapter_attention-mechanisms/transformer.md
[docs]class TransformerEncoder(d2l.Encoder): def __init__(self, vocab_size, key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, num_layers, dropout, use_bias=False, **kwargs): super(TransformerEncoder, self).__init__(**kwargs) self.num_hiddens = num_hiddens self.embedding = nn.Embedding(vocab_size, num_hiddens) self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout) self.blks = nn.Sequential() for i in range(num_layers): self.blks.add_module( "block" + str(i), EncoderBlock(key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, dropout, use_bias))
[docs] def forward(self, X, valid_lens, *args): # Since positional encoding values are between -1 and 1, the embedding # values are multiplied by the square root of the embedding dimension # to rescale before they are summed up X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens)) self.attention_weights = [None] * len(self.blks) for i, blk in enumerate(self.blks): X = blk(X, valid_lens) self.attention_weights[ i] = blk.attention.attention.attention_weights return X
# Defined in file: ./chapter_optimization/optimization-intro.md
[docs]def annotate(text, xy, xytext): d2l.plt.gca().annotate(text, xy=xy, xytext=xytext, arrowprops=dict(arrowstyle='->'))
# Defined in file: ./chapter_optimization/gd.md
[docs]def train_2d(trainer, steps=20): """Optimize a 2-dim objective function with a customized trainer.""" # s1 and s2 are internal state variables and will # be used later in the chapter x1, x2, s1, s2 = -5, -2, 0, 0 results = [(x1, x2)] for i in range(steps): x1, x2, s1, s2 = trainer(x1, x2, s1, s2) results.append((x1, x2)) return results
[docs]def show_trace_2d(f, results): """Show the trace of 2D variables during optimization.""" d2l.set_figsize() d2l.plt.plot(*zip(*results), '-o', color='#ff7f0e') x1, x2 = d2l.meshgrid(d2l.arange(-5.5, 1.0, 0.1), d2l.arange(-3.0, 1.0, 0.1)) d2l.plt.contour(x1, x2, f(x1, x2), colors='#1f77b4') d2l.plt.xlabel('x1') d2l.plt.ylabel('x2')
# Defined in file: ./chapter_optimization/minibatch-sgd.md d2l.DATA_HUB['airfoil'] = (d2l.DATA_URL + 'airfoil_self_noise.dat', '76e5be1548fd8222e5074cf0faae75edff8cf93f')
[docs]def get_data_ch11(batch_size=10, n=1500): data = np.genfromtxt(d2l.download('airfoil'), dtype=np.float32, delimiter='\t') data = torch.from_numpy((data - data.mean(axis=0)) / data.std(axis=0)) data_iter = d2l.load_array((data[:n, :-1], data[:n, -1]), batch_size, is_train=True) return data_iter, data.shape[1] - 1
# Defined in file: ./chapter_optimization/minibatch-sgd.md
[docs]def train_ch11(trainer_fn, states, hyperparams, data_iter, feature_dim, num_epochs=2): # Initialization w = torch.normal(mean=0.0, std=0.01, size=(feature_dim, 1), requires_grad=True) b = torch.zeros((1), requires_grad=True) net, loss = lambda X: d2l.linreg(X, w, b), d2l.squared_loss # Train animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[0, num_epochs], ylim=[0.22, 0.35]) n, timer = 0, d2l.Timer() for _ in range(num_epochs): for X, y in data_iter: l = loss(net(X), y).mean() l.backward() trainer_fn([w, b], states, hyperparams) n += X.shape[0] if n % 200 == 0: timer.stop() animator.add(n / X.shape[0] / len(data_iter), (d2l.evaluate_loss(net, data_iter, loss),)) timer.start() print(f'loss: {animator.Y[0][-1]:.3f}, {timer.avg():.3f} sec/epoch') return timer.cumsum(), animator.Y[0]
# Defined in file: ./chapter_optimization/minibatch-sgd.md
[docs]def train_concise_ch11(trainer_fn, hyperparams, data_iter, num_epochs=4): # Initialization net = nn.Sequential(nn.Linear(5, 1)) def init_weights(m): if type(m) == nn.Linear: torch.nn.init.normal_(m.weight, std=0.01) net.apply(init_weights) optimizer = trainer_fn(net.parameters(), **hyperparams) loss = nn.MSELoss() # Note: L2 Loss = 1/2 * MSE Loss. PyTorch has MSE Loss which is slightly # different from MXNet's L2Loss by a factor of 2. Hence we halve the loss # value to get L2Loss in PyTorch animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[0, num_epochs], ylim=[0.22, 0.35]) n, timer = 0, d2l.Timer() for _ in range(num_epochs): for X, y in data_iter: optimizer.zero_grad() out = net(X) y = y.reshape(out.shape) l = loss(out, y) / 2 l.backward() optimizer.step() n += X.shape[0] if n % 200 == 0: timer.stop() animator.add(n / X.shape[0] / len(data_iter), (d2l.evaluate_loss(net, data_iter, loss) / 2,)) timer.start() print(f'loss: {animator.Y[0][-1]:.3f}, {timer.avg():.3f} sec/epoch')
# Defined in file: ./chapter_computational-performance/hybridize.md
[docs]class Benchmark: def __init__(self, description='Done'): self.description = description def __enter__(self): self.timer = d2l.Timer() return self def __exit__(self, *args): print(f'{self.description}: {self.timer.stop():.4f} sec')
# Defined in file: ./chapter_computational-performance/multiple-gpus.md
[docs]def split_batch(X, y, devices): """Split `X` and `y` into multiple devices.""" assert X.shape[0] == y.shape[0] return (nn.parallel.scatter(X, devices), nn.parallel.scatter(y, devices))
# Defined in file: ./chapter_computational-performance/multiple-gpus-concise.md
[docs]def resnet18(num_classes, in_channels=1): """A slightly modified ResNet-18 model.""" def resnet_block(in_channels, out_channels, num_residuals, first_block=False): blk = [] for i in range(num_residuals): if i == 0 and not first_block: blk.append( d2l.Residual(in_channels, out_channels, use_1x1conv=True, strides=2)) else: blk.append(d2l.Residual(out_channels, out_channels)) return nn.Sequential(*blk) # This model uses a smaller convolution kernel, stride, and padding and # removes the maximum pooling layer net = nn.Sequential( nn.Conv2d(in_channels, 64, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(64), nn.ReLU()) net.add_module("resnet_block1", resnet_block(64, 64, 2, first_block=True)) net.add_module("resnet_block2", resnet_block(64, 128, 2)) net.add_module("resnet_block3", resnet_block(128, 256, 2)) net.add_module("resnet_block4", resnet_block(256, 512, 2)) net.add_module("global_avg_pool", nn.AdaptiveAvgPool2d((1, 1))) net.add_module("fc", nn.Sequential(nn.Flatten(), nn.Linear(512, num_classes))) return net
# Defined in file: ./chapter_computer-vision/image-augmentation.md
[docs]def train_batch_ch13(net, X, y, loss, trainer, devices): if isinstance(X, list): # Required for BERT Fine-tuning (to be covered later) X = [x.to(devices[0]) for x in X] else: X = X.to(devices[0]) y = y.to(devices[0]) net.train() trainer.zero_grad() pred = net(X) l = loss(pred, y) l.sum().backward() trainer.step() train_loss_sum = l.sum() train_acc_sum = d2l.accuracy(pred, y) return train_loss_sum, train_acc_sum
# Defined in file: ./chapter_computer-vision/image-augmentation.md
[docs]def train_ch13(net, train_iter, test_iter, loss, trainer, num_epochs, devices=d2l.try_all_gpus()): timer, num_batches = d2l.Timer(), len(train_iter) animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs], ylim=[0, 1], legend=['train loss', 'train acc', 'test acc']) net = nn.DataParallel(net, device_ids=devices).to(devices[0]) for epoch in range(num_epochs): # Store training_loss, training_accuracy, num_examples, num_features metric = d2l.Accumulator(4) for i, (features, labels) in enumerate(train_iter): timer.start() l, acc = train_batch_ch13(net, features, labels, loss, trainer, devices) metric.add(l, acc, labels.shape[0], labels.numel()) timer.stop() if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1: animator.add( epoch + (i + 1) / num_batches, (metric[0] / metric[2], metric[1] / metric[3], None)) test_acc = d2l.evaluate_accuracy_gpu(net, test_iter) animator.add(epoch + 1, (None, None, test_acc)) print(f'loss {metric[0] / metric[2]:.3f}, train acc ' f'{metric[1] / metric[3]:.3f}, test acc {test_acc:.3f}') print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec on ' f'{str(devices)}')
# Defined in file: ./chapter_computer-vision/fine-tuning.md d2l.DATA_HUB['hotdog'] = (d2l.DATA_URL + 'hotdog.zip', 'fba480ffa8aa7e0febbb511d181409f899b9baa5') # Defined in file: ./chapter_computer-vision/bounding-box.md
[docs]def box_corner_to_center(boxes): """Convert from (upper_left, bottom_right) to (center, width, height)""" x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] cx = (x1 + x2) / 2 cy = (y1 + y2) / 2 w = x2 - x1 h = y2 - y1 boxes = d2l.stack((cx, cy, w, h), axis=-1) return boxes
[docs]def box_center_to_corner(boxes): """Convert from (center, width, height) to (upper_left, bottom_right)""" cx, cy, w, h = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] x1 = cx - 0.5 * w y1 = cy - 0.5 * h x2 = cx + 0.5 * w y2 = cy + 0.5 * h boxes = d2l.stack((x1, y1, x2, y2), axis=-1) return boxes
# Defined in file: ./chapter_computer-vision/bounding-box.md
[docs]def bbox_to_rect(bbox, color): """Convert bounding box to matplotlib format.""" # Convert the bounding box (top-left x, top-left y, bottom-right x, # bottom-right y) format to matplotlib format: ((upper-left x, # upper-left y), width, height) return d2l.plt.Rectangle(xy=(bbox[0], bbox[1]), width=bbox[2] - bbox[0], height=bbox[3] - bbox[1], fill=False, edgecolor=color, linewidth=2)
# Defined in file: ./chapter_computer-vision/anchor.md
[docs]def multibox_prior(data, sizes, ratios): in_height, in_width = data.shape[-2:] device, num_sizes, num_ratios = data.device, len(sizes), len(ratios) boxes_per_pixel = (num_sizes + num_ratios - 1) size_tensor = d2l.tensor(sizes, device=device) ratio_tensor = d2l.tensor(ratios, device=device) # Offsets are required to move the anchor to center of a pixel # Since pixel (height=1, width=1), we choose to offset our centers by 0.5 offset_h, offset_w = 0.5, 0.5 steps_h = 1.0 / in_height # Scaled steps in y axis steps_w = 1.0 / in_width # Scaled steps in x axis # Generate all center points for the anchor boxes center_h = (torch.arange(in_height, device=device) + offset_h) * steps_h center_w = (torch.arange(in_width, device=device) + offset_w) * steps_w shift_y, shift_x = torch.meshgrid(center_h, center_w) shift_y, shift_x = shift_y.reshape(-1), shift_x.reshape(-1) # Generate boxes_per_pixel number of heights and widths which are later # used to create anchor box corner coordinates (xmin, xmax, ymin, ymax) # cat (various sizes, first ratio) and (first size, various ratios) w = torch.cat((size_tensor * torch.sqrt(ratio_tensor[0]), sizes[0] * torch.sqrt(ratio_tensor[1:])))\ * in_height / in_width # handle rectangular inputs h = torch.cat((size_tensor / torch.sqrt(ratio_tensor[0]), sizes[0] / torch.sqrt(ratio_tensor[1:]))) # Divide by 2 to get half height and half width anchor_manipulations = torch.stack( (-w, -h, w, h)).T.repeat(in_height * in_width, 1) / 2 # Each center point will have boxes_per_pixel number of anchor boxes, so # generate grid of all anchor box centers with boxes_per_pixel repeats out_grid = torch.stack([shift_x, shift_y, shift_x, shift_y], dim=1).repeat_interleave(boxes_per_pixel, dim=0) output = out_grid + anchor_manipulations return output.unsqueeze(0)
# Defined in file: ./chapter_computer-vision/anchor.md
[docs]def show_bboxes(axes, bboxes, labels=None, colors=None): """Show bounding boxes.""" def _make_list(obj, default_values=None): if obj is None: obj = default_values elif not isinstance(obj, (list, tuple)): obj = [obj] return obj labels = _make_list(labels) colors = _make_list(colors, ['b', 'g', 'r', 'm', 'c']) for i, bbox in enumerate(bboxes): color = colors[i % len(colors)] rect = d2l.bbox_to_rect(d2l.numpy(bbox), color) axes.add_patch(rect) if labels and len(labels) > i: text_color = 'k' if color == 'w' else 'w' axes.text(rect.xy[0], rect.xy[1], labels[i], va='center', ha='center', fontsize=9, color=text_color, bbox=dict(facecolor=color, lw=0))
# Defined in file: ./chapter_computer-vision/anchor.md
[docs]def box_iou(boxes1, boxes2): """Compute IOU between two sets of boxes of shape (N,4) and (M,4).""" # Compute box areas box_area = lambda boxes: ((boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])) area1 = box_area(boxes1) area2 = box_area(boxes2) lt = torch.max(boxes1[:, None, :2], boxes2[:, :2]) # [N,M,2] rb = torch.min(boxes1[:, None, 2:], boxes2[:, 2:]) # [N,M,2] wh = (rb - lt).clamp(min=0) # [N,M,2] inter = wh[:, :, 0] * wh[:, :, 1] # [N,M] unioun = area1[:, None] + area2 - inter return inter / unioun
# Defined in file: ./chapter_computer-vision/anchor.md
[docs]def match_anchor_to_bbox(ground_truth, anchors, device, iou_threshold=0.5): """Assign ground-truth bounding boxes to anchor boxes similar to them.""" num_anchors, num_gt_boxes = anchors.shape[0], ground_truth.shape[0] # Element `x_ij` in the `i^th` row and `j^th` column is the IoU # of the anchor box `anc_i` to the ground-truth bounding box `box_j` jaccard = box_iou(anchors, ground_truth) # Initialize the tensor to hold assigned ground truth bbox for each anchor anchors_bbox_map = torch.full((num_anchors,), -1, dtype=torch.long, device=device) # Assign ground truth bounding box according to the threshold max_ious, indices = torch.max(jaccard, dim=1) anc_i = torch.nonzero(max_ious >= 0.5).reshape(-1) box_j = indices[max_ious >= 0.5] anchors_bbox_map[anc_i] = box_j # Find the largest iou for each bbox col_discard = torch.full((num_anchors,), -1) row_discard = torch.full((num_gt_boxes,), -1) for _ in range(num_gt_boxes): max_idx = torch.argmax(jaccard) box_idx = (max_idx % num_gt_boxes).long() anc_idx = (max_idx / num_gt_boxes).long() anchors_bbox_map[anc_idx] = box_idx jaccard[:, box_idx] = col_discard jaccard[anc_idx, :] = row_discard return anchors_bbox_map
# Defined in file: ./chapter_computer-vision/anchor.md
[docs]def offset_boxes(anchors, assigned_bb, eps=1e-6): c_anc = d2l.box_corner_to_center(anchors) c_assigned_bb = d2l.box_corner_to_center(assigned_bb) offset_xy = 10 * (c_assigned_bb[:, :2] - c_anc[:, :2]) / c_anc[:, 2:] offset_wh = 5 * d2l.log(eps + c_assigned_bb[:, 2:] / c_anc[:, 2:]) offset = d2l.concat([offset_xy, offset_wh], axis=1) return offset
# Defined in file: ./chapter_computer-vision/anchor.md
[docs]def multibox_target(anchors, labels): batch_size, anchors = labels.shape[0], anchors.squeeze(0) batch_offset, batch_mask, batch_class_labels = [], [], [] device, num_anchors = anchors.device, anchors.shape[0] for i in range(batch_size): label = labels[i, :, :] anchors_bbox_map = match_anchor_to_bbox(label[:, 1:], anchors, device) bbox_mask = ((anchors_bbox_map >= 0).float().unsqueeze(-1)).repeat( 1, 4) # Initialize class_labels and assigned bbox coordinates with zeros class_labels = torch.zeros(num_anchors, dtype=torch.long, device=device) assigned_bb = torch.zeros((num_anchors, 4), dtype=torch.float32, device=device) # Assign class labels to the anchor boxes using matched gt bbox labels # If no gt bbox is assigned to an anchor box, then let the # class_labels and assigned_bb remain zero, i.e the background class indices_true = torch.nonzero(anchors_bbox_map >= 0) bb_idx = anchors_bbox_map[indices_true] class_labels[indices_true] = label[bb_idx, 0].long() + 1 assigned_bb[indices_true] = label[bb_idx, 1:] # offset transformations offset = offset_boxes(anchors, assigned_bb) * bbox_mask batch_offset.append(offset.reshape(-1)) batch_mask.append(bbox_mask.reshape(-1)) batch_class_labels.append(class_labels) bbox_offset = torch.stack(batch_offset) bbox_mask = torch.stack(batch_mask) class_labels = torch.stack(batch_class_labels) return (bbox_offset, bbox_mask, class_labels)
# Defined in file: ./chapter_computer-vision/anchor.md
[docs]def offset_inverse(anchors, offset_preds): c_anc = d2l.box_corner_to_center(anchors) c_pred_bb_xy = (offset_preds[:, :2] * c_anc[:, 2:] / 10) + c_anc[:, :2] c_pred_bb_wh = d2l.exp(offset_preds[:, 2:] / 5) * c_anc[:, 2:] c_pred_bb = d2l.concat((c_pred_bb_xy, c_pred_bb_wh), axis=1) predicted_bb = d2l.box_center_to_corner(c_pred_bb) return predicted_bb
# Defined in file: ./chapter_computer-vision/anchor.md
[docs]def nms(boxes, scores, iou_threshold): # sorting scores by the descending order and return their indices B = torch.argsort(scores, dim=-1, descending=True) keep = [] # boxes indices that will be kept while B.numel() > 0: i = B[0] keep.append(i) if B.numel() == 1: break iou = box_iou(boxes[i, :].reshape(-1, 4), boxes[B[1:], :].reshape(-1, 4)).reshape(-1) inds = torch.nonzero(iou <= iou_threshold).reshape(-1) B = B[inds + 1] return d2l.tensor(keep, device=boxes.device)
[docs]def multibox_detection(cls_probs, offset_preds, anchors, nms_threshold=0.5, pos_threshold=0.00999999978): device, batch_size = cls_probs.device, cls_probs.shape[0] anchors = anchors.squeeze(0) num_classes, num_anchors = cls_probs.shape[1], cls_probs.shape[2] out = [] for i in range(batch_size): cls_prob, offset_pred = cls_probs[i], offset_preds[i].reshape(-1, 4) conf, class_id = torch.max(cls_prob[1:], 0) predicted_bb = offset_inverse(anchors, offset_pred) keep = nms(predicted_bb, conf, 0.5) # Find all non_keep indices and set the class_id to background all_idx = torch.arange(num_anchors, dtype=torch.long, device=device) combined = torch.cat((keep, all_idx)) uniques, counts = combined.unique(return_counts=True) non_keep = uniques[counts == 1] all_id_sorted = torch.cat((keep, non_keep)) class_id[non_keep] = -1 class_id = class_id[all_id_sorted] conf, predicted_bb = conf[all_id_sorted], predicted_bb[all_id_sorted] # threshold to be a positive prediction below_min_idx = (conf < pos_threshold) class_id[below_min_idx] = -1 conf[below_min_idx] = 1 - conf[below_min_idx] pred_info = torch.cat( (class_id.unsqueeze(1), conf.unsqueeze(1), predicted_bb), dim=1) out.append(pred_info) return d2l.stack(out)
# Defined in file: ./chapter_computer-vision/object-detection-dataset.md d2l.DATA_HUB['banana-detection'] = ( d2l.DATA_URL + 'banana-detection.zip', '5de26c8fce5ccdea9f91267273464dc968d20d72') # Defined in file: ./chapter_computer-vision/object-detection-dataset.md
[docs]def read_data_bananas(is_train=True): """Read the bananas dataset images and labels.""" data_dir = d2l.download_extract('banana-detection') csv_fname = os.path.join(data_dir, 'bananas_train' if is_train else 'bananas_val', 'label.csv') csv_data = pd.read_csv(csv_fname) csv_data = csv_data.set_index('img_name') images, targets = [], [] for img_name, target in csv_data.iterrows(): images.append( torchvision.io.read_image( os.path.join(data_dir, 'bananas_train' if is_train else 'bananas_val', 'images', f'{img_name}'))) # Since all images have same object class i.e. category '0', # the `label` column corresponds to the only object i.e. banana # The target is as follows : (`label`, `xmin`, `ymin`, `xmax`, `ymax`) targets.append(list(target)) return images, torch.tensor(targets).unsqueeze(1) / 256
[docs]class BananasDataset(torch.utils.data.Dataset): def __init__(self, is_train): self.features, self.labels = read_data_bananas(is_train) print('read ' + str(len(self.features)) + ( f' training examples' if is_train else f' validation examples')) def __getitem__(self, idx): return (self.features[idx].float(), self.labels[idx]) def __len__(self): return len(self.features)
[docs]def load_data_bananas(batch_size): """Load the bananas dataset.""" train_iter = torch.utils.data.DataLoader(BananasDataset(is_train=True), batch_size, shuffle=True) val_iter = torch.utils.data.DataLoader(BananasDataset(is_train=False), batch_size) return (train_iter, val_iter)
# Defined in file: ./chapter_computer-vision/semantic-segmentation-and-dataset.md d2l.DATA_HUB['voc2012'] = (d2l.DATA_URL + 'VOCtrainval_11-May-2012.tar', '4e443f8a2eca6b1dac8a6c57641b67dd40621a49') # Defined in file: ./chapter_computer-vision/semantic-segmentation-and-dataset.md
[docs]def read_voc_images(voc_dir, is_train=True): """Read all VOC feature and label images.""" txt_fname = os.path.join(voc_dir, 'ImageSets', 'Segmentation', 'train.txt' if is_train else 'val.txt') mode = torchvision.io.image.ImageReadMode.RGB with open(txt_fname, 'r') as f: images = f.read().split() features, labels = [], [] for i, fname in enumerate(images): features.append( torchvision.io.read_image( os.path.join(voc_dir, 'JPEGImages', f'{fname}.jpg'))) labels.append( torchvision.io.read_image( os.path.join(voc_dir, 'SegmentationClass', f'{fname}.png'), mode)) return features, labels
# Defined in file: ./chapter_computer-vision/semantic-segmentation-and-dataset.md VOC_COLORMAP = [[0, 0, 0], [128, 0, 0], [0, 128, 0], [128, 128, 0], [0, 0, 128], [128, 0, 128], [0, 128, 128], [128, 128, 128], [64, 0, 0], [192, 0, 0], [64, 128, 0], [192, 128, 0], [64, 0, 128], [192, 0, 128], [64, 128, 128], [192, 128, 128], [0, 64, 0], [128, 64, 0], [0, 192, 0], [128, 192, 0], [0, 64, 128]] VOC_CLASSES = [ 'background', 'aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus', 'car', 'cat', 'chair', 'cow', 'diningtable', 'dog', 'horse', 'motorbike', 'person', 'potted plant', 'sheep', 'sofa', 'train', 'tv/monitor'] # Defined in file: ./chapter_computer-vision/semantic-segmentation-and-dataset.md
[docs]def build_colormap2label(): """Build an RGB color to label mapping for segmentation.""" colormap2label = torch.zeros(256**3, dtype=torch.long) for i, colormap in enumerate(VOC_COLORMAP): colormap2label[(colormap[0] * 256 + colormap[1]) * 256 + colormap[2]] = i return colormap2label
[docs]def voc_label_indices(colormap, colormap2label): """Map an RGB color to a label.""" colormap = colormap.permute(1, 2, 0).numpy().astype('int32') idx = ((colormap[:, :, 0] * 256 + colormap[:, :, 1]) * 256 + colormap[:, :, 2]) return colormap2label[idx]
# Defined in file: ./chapter_computer-vision/semantic-segmentation-and-dataset.md
[docs]def voc_rand_crop(feature, label, height, width): """Randomly crop for both feature and label images.""" rect = torchvision.transforms.RandomCrop.get_params( feature, (height, width)) feature = torchvision.transforms.functional.crop(feature, *rect) label = torchvision.transforms.functional.crop(label, *rect) return feature, label
# Defined in file: ./chapter_computer-vision/semantic-segmentation-and-dataset.md
[docs]class VOCSegDataset(torch.utils.data.Dataset): """A customized dataset to load VOC dataset.""" def __init__(self, is_train, crop_size, voc_dir): self.transform = torchvision.transforms.Normalize( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) self.crop_size = crop_size features, labels = read_voc_images(voc_dir, is_train=is_train) self.features = [ self.normalize_image(feature) for feature in self.filter(features)] self.labels = self.filter(labels) self.colormap2label = build_colormap2label() print('read ' + str(len(self.features)) + ' examples')
[docs] def normalize_image(self, img): return self.transform(img.float())
[docs] def filter(self, imgs): return [ img for img in imgs if (img.shape[1] >= self.crop_size[0] and img.shape[2] >= self.crop_size[1])]
def __getitem__(self, idx): feature, label = voc_rand_crop(self.features[idx], self.labels[idx], *self.crop_size) return (feature, voc_label_indices(label, self.colormap2label)) def __len__(self): return len(self.features)
# Defined in file: ./chapter_computer-vision/semantic-segmentation-and-dataset.md
[docs]def load_data_voc(batch_size, crop_size): """Download and load the VOC2012 semantic dataset.""" voc_dir = d2l.download_extract('voc2012', os.path.join('VOCdevkit', 'VOC2012')) num_workers = d2l.get_dataloader_workers() train_iter = torch.utils.data.DataLoader( VOCSegDataset(True, crop_size, voc_dir), batch_size, shuffle=True, drop_last=True, num_workers=num_workers) test_iter = torch.utils.data.DataLoader( VOCSegDataset(False, crop_size, voc_dir), batch_size, drop_last=True, num_workers=num_workers) return train_iter, test_iter
# Defined in file: ./chapter_computer-vision/kaggle-cifar10.md d2l.DATA_HUB['cifar10_tiny'] = (d2l.DATA_URL + 'kaggle_cifar10_tiny.zip', '2068874e4b9a9f0fb07ebe0ad2b29754449ccacd') # Defined in file: ./chapter_computer-vision/kaggle-cifar10.md
[docs]def read_csv_labels(fname): """Read fname to return a name to label dictionary.""" with open(fname, 'r') as f: # Skip the file header line (column name) lines = f.readlines()[1:] tokens = [l.rstrip().split(',') for l in lines] return dict(((name, label) for name, label in tokens))
# Defined in file: ./chapter_computer-vision/kaggle-cifar10.md
[docs]def copyfile(filename, target_dir): """Copy a file into a target directory.""" os.makedirs(target_dir, exist_ok=True) shutil.copy(filename, target_dir)
[docs]def reorg_train_valid(data_dir, labels, valid_ratio): # The number of examples of the class with the least examples in the # training dataset n = collections.Counter(labels.values()).most_common()[-1][1] # The number of examples per class for the validation set n_valid_per_label = max(1, math.floor(n * valid_ratio)) label_count = {} for train_file in os.listdir(os.path.join(data_dir, 'train')): label = labels[train_file.split('.')[0]] fname = os.path.join(data_dir, 'train', train_file) # Copy to train_valid_test/train_valid with a subfolder per class copyfile( fname, os.path.join(data_dir, 'train_valid_test', 'train_valid', label)) if label not in label_count or label_count[label] < n_valid_per_label: # Copy to train_valid_test/valid copyfile( fname, os.path.join(data_dir, 'train_valid_test', 'valid', label)) label_count[label] = label_count.get(label, 0) + 1 else: # Copy to train_valid_test/train copyfile( fname, os.path.join(data_dir, 'train_valid_test', 'train', label)) return n_valid_per_label
# Defined in file: ./chapter_computer-vision/kaggle-cifar10.md
[docs]def reorg_test(data_dir): for test_file in os.listdir(os.path.join(data_dir, 'test')): copyfile( os.path.join(data_dir, 'test', test_file), os.path.join(data_dir, 'train_valid_test', 'test', 'unknown'))
# Defined in file: ./chapter_computer-vision/kaggle-dog.md d2l.DATA_HUB['dog_tiny'] = (d2l.DATA_URL + 'kaggle_dog_tiny.zip', '0cb91d09b814ecdc07b50f31f8dcad3e81d6a86d') # Defined in file: ./chapter_natural-language-processing-pretraining/word-embedding-dataset.md d2l.DATA_HUB['ptb'] = (d2l.DATA_URL + 'ptb.zip', '319d85e578af0cdc590547f26231e4e31cdf1e42')
[docs]def read_ptb(): data_dir = d2l.download_extract('ptb') with open(os.path.join(data_dir, 'ptb.train.txt')) as f: raw_text = f.read() return [line.split() for line in raw_text.split('\n')]
# Defined in file: ./chapter_natural-language-processing-pretraining/word-embedding-dataset.md
[docs]def subsampling(sentences, vocab): # Map low frequency words into <unk> sentences = [[vocab.idx_to_token[vocab[tk]] for tk in line] for line in sentences] # Count the frequency for each word counter = d2l.count_corpus(sentences) num_tokens = sum(counter.values()) # Return True if to keep this token during subsampling def keep(token): return (random.uniform(0, 1) < math.sqrt( 1e-4 / counter[token] * num_tokens)) # Now do the subsampling return [[tk for tk in line if keep(tk)] for line in sentences]
# Defined in file: ./chapter_natural-language-processing-pretraining/word-embedding-dataset.md
[docs]def get_centers_and_contexts(corpus, max_window_size): centers, contexts = [], [] for line in corpus: # Each sentence needs at least 2 words to form a "central target word # - context word" pair if len(line) < 2: continue centers += line for i in range(len(line)): # Context window centered at i window_size = random.randint(1, max_window_size) indices = list( range(max(0, i - window_size), min(len(line), i + 1 + window_size))) # Exclude the central target word from the context words indices.remove(i) contexts.append([line[idx] for idx in indices]) return centers, contexts
# Defined in file: ./chapter_natural-language-processing-pretraining/word-embedding-dataset.md
[docs]class RandomGenerator: """Draw a random int in [0, n] according to n sampling weights.""" def __init__(self, sampling_weights): self.population = list(range(len(sampling_weights))) self.sampling_weights = sampling_weights self.candidates = [] self.i = 0
[docs] def draw(self): if self.i == len(self.candidates): self.candidates = random.choices(self.population, self.sampling_weights, k=10000) self.i = 0 self.i += 1 return self.candidates[self.i - 1]
# Defined in file: ./chapter_natural-language-processing-pretraining/word-embedding-dataset.md
[docs]def get_negatives(all_contexts, corpus, K): counter = d2l.count_corpus(corpus) sampling_weights = [counter[i]**0.75 for i in range(len(counter))] all_negatives, generator = [], RandomGenerator(sampling_weights) for contexts in all_contexts: negatives = [] while len(negatives) < len(contexts) * K: neg = generator.draw() # Noise words cannot be context words if neg not in contexts: negatives.append(neg) all_negatives.append(negatives) return all_negatives
# Defined in file: ./chapter_natural-language-processing-pretraining/word-embedding-dataset.md
[docs]def batchify(data): max_len = max(len(c) + len(n) for _, c, n in data) centers, contexts_negatives, masks, labels = [], [], [], [] for center, context, negative in data: cur_len = len(context) + len(negative) centers += [center] contexts_negatives += [context + negative + [0] * (max_len - cur_len)] masks += [[1] * cur_len + [0] * (max_len - cur_len)] labels += [[1] * len(context) + [0] * (max_len - len(context))] return (d2l.reshape(d2l.tensor(centers), (-1, 1)), d2l.tensor(contexts_negatives), d2l.tensor(masks), d2l.tensor(labels))
# Defined in file: ./chapter_natural-language-processing-pretraining/word-embedding-dataset.md
[docs]def load_data_ptb(batch_size, max_window_size, num_noise_words): num_workers = d2l.get_dataloader_workers() sentences = read_ptb() vocab = d2l.Vocab(sentences, min_freq=10) subsampled = subsampling(sentences, vocab) corpus = [vocab[line] for line in subsampled] all_centers, all_contexts = get_centers_and_contexts( corpus, max_window_size) all_negatives = get_negatives(all_contexts, corpus, num_noise_words) class PTBDataset(torch.utils.data.Dataset): def __init__(self, centers, contexts, negatives): assert len(centers) == len(contexts) == len(negatives) self.centers = centers self.contexts = contexts self.negatives = negatives def __getitem__(self, index): return (self.centers[index], self.contexts[index], self.negatives[index]) def __len__(self): return len(self.centers) dataset = PTBDataset(all_centers, all_contexts, all_negatives) data_iter = torch.utils.data.DataLoader(dataset, batch_size, shuffle=True, collate_fn=batchify, num_workers=num_workers) return data_iter, vocab
# Defined in file: ./chapter_natural-language-processing-pretraining/similarity-analogy.md d2l.DATA_HUB['glove.6b.50d'] = (d2l.DATA_URL + 'glove.6B.50d.zip', '0b8703943ccdb6eb788e6f091b8946e82231bc4d') d2l.DATA_HUB['glove.6b.100d'] = (d2l.DATA_URL + 'glove.6B.100d.zip', 'cd43bfb07e44e6f27cbcc7bc9ae3d80284fdaf5a') d2l.DATA_HUB['glove.42b.300d'] = (d2l.DATA_URL + 'glove.42B.300d.zip', 'b5116e234e9eb9076672cfeabf5469f3eec904fa') d2l.DATA_HUB['wiki.en'] = (d2l.DATA_URL + 'wiki.en.zip', 'c1816da3821ae9f43899be655002f6c723e91b88') # Defined in file: ./chapter_natural-language-processing-pretraining/similarity-analogy.md
[docs]class TokenEmbedding: """Token Embedding.""" def __init__(self, embedding_name): self.idx_to_token, self.idx_to_vec = self._load_embedding( embedding_name) self.unknown_idx = 0 self.token_to_idx = { token: idx for idx, token in enumerate(self.idx_to_token)} def _load_embedding(self, embedding_name): idx_to_token, idx_to_vec = ['<unk>'], [] data_dir = d2l.download_extract(embedding_name) # GloVe website: https://nlp.stanford.edu/projects/glove/ # fastText website: https://fasttext.cc/ with open(os.path.join(data_dir, 'vec.txt'), 'r') as f: for line in f: elems = line.rstrip().split(' ') token, elems = elems[0], [float(elem) for elem in elems[1:]] # Skip header information, such as the top row in fastText if len(elems) > 1: idx_to_token.append(token) idx_to_vec.append(elems) idx_to_vec = [[0] * len(idx_to_vec[0])] + idx_to_vec return idx_to_token, d2l.tensor(idx_to_vec) def __getitem__(self, tokens): indices = [ self.token_to_idx.get(token, self.unknown_idx) for token in tokens] vecs = self.idx_to_vec[d2l.tensor(indices)] return vecs def __len__(self): return len(self.idx_to_token)
# Defined in file: ./chapter_natural-language-processing-pretraining/bert.md
[docs]def get_tokens_and_segments(tokens_a, tokens_b=None): tokens = ['<cls>'] + tokens_a + ['<sep>'] # 0 and 1 are marking segment A and B, respectively segments = [0] * (len(tokens_a) + 2) if tokens_b is not None: tokens += tokens_b + ['<sep>'] segments += [1] * (len(tokens_b) + 1) return tokens, segments
# Defined in file: ./chapter_natural-language-processing-pretraining/bert.md
[docs]class BERTEncoder(nn.Module): def __init__(self, vocab_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, num_layers, dropout, max_len=1000, key_size=768, query_size=768, value_size=768, **kwargs): super(BERTEncoder, self).__init__(**kwargs) self.token_embedding = nn.Embedding(vocab_size, num_hiddens) self.segment_embedding = nn.Embedding(2, num_hiddens) self.blks = nn.Sequential() for i in range(num_layers): self.blks.add_module( f"{i}", d2l.EncoderBlock(key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, dropout, True)) # In BERT, positional embeddings are learnable, thus we create a # parameter of positional embeddings that are long enough self.pos_embedding = nn.Parameter(torch.randn(1, max_len, num_hiddens))
[docs] def forward(self, tokens, segments, valid_lens): # Shape of `X` remains unchanged in the following code snippet: # (batch size, max sequence length, `num_hiddens`) X = self.token_embedding(tokens) + self.segment_embedding(segments) X = X + self.pos_embedding.data[:, :X.shape[1], :] for blk in self.blks: X = blk(X, valid_lens) return X
# Defined in file: ./chapter_natural-language-processing-pretraining/bert.md
[docs]class MaskLM(nn.Module): def __init__(self, vocab_size, num_hiddens, num_inputs=768, **kwargs): super(MaskLM, self).__init__(**kwargs) self.mlp = nn.Sequential(nn.Linear(num_inputs, num_hiddens), nn.ReLU(), nn.LayerNorm(num_hiddens), nn.Linear(num_hiddens, vocab_size))
[docs] def forward(self, X, pred_positions): num_pred_positions = pred_positions.shape[1] pred_positions = pred_positions.reshape(-1) batch_size = X.shape[0] batch_idx = torch.arange(0, batch_size) # Suppose that `batch_size` = 2, `num_pred_positions` = 3, then # `batch_idx` is `torch.tensor([0, 0, 0, 1, 1, 1])` batch_idx = torch.repeat_interleave(batch_idx, num_pred_positions) masked_X = X[batch_idx, pred_positions] masked_X = masked_X.reshape((batch_size, num_pred_positions, -1)) mlm_Y_hat = self.mlp(masked_X) return mlm_Y_hat
# Defined in file: ./chapter_natural-language-processing-pretraining/bert.md
[docs]class NextSentencePred(nn.Module): def __init__(self, num_inputs, **kwargs): super(NextSentencePred, self).__init__(**kwargs) self.output = nn.Linear(num_inputs, 2)
[docs] def forward(self, X): # `X` shape: (batch size, `num_hiddens`) return self.output(X)
# Defined in file: ./chapter_natural-language-processing-pretraining/bert.md
[docs]class BERTModel(nn.Module): def __init__(self, vocab_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, num_layers, dropout, max_len=1000, key_size=768, query_size=768, value_size=768, hid_in_features=768, mlm_in_features=768, nsp_in_features=768): super(BERTModel, self).__init__() self.encoder = BERTEncoder(vocab_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, num_layers, dropout, max_len=max_len, key_size=key_size, query_size=query_size, value_size=value_size) self.hidden = nn.Sequential(nn.Linear(hid_in_features, num_hiddens), nn.Tanh()) self.mlm = MaskLM(vocab_size, num_hiddens, mlm_in_features) self.nsp = NextSentencePred(nsp_in_features)
[docs] def forward(self, tokens, segments, valid_lens=None, pred_positions=None): encoded_X = self.encoder(tokens, segments, valid_lens) if pred_positions is not None: mlm_Y_hat = self.mlm(encoded_X, pred_positions) else: mlm_Y_hat = None # The hidden layer of the MLP classifier for next sentence prediction. # 0 is the index of the '<cls>' token nsp_Y_hat = self.nsp(self.hidden(encoded_X[:, 0, :])) return encoded_X, mlm_Y_hat, nsp_Y_hat
# Defined in file: ./chapter_natural-language-processing-pretraining/bert-dataset.md d2l.DATA_HUB['wikitext-2'] = ( 'https://s3.amazonaws.com/research.metamind.io/wikitext/' 'wikitext-2-v1.zip', '3c914d17d80b1459be871a5039ac23e752a53cbe') def _read_wiki(data_dir): file_name = os.path.join(data_dir, 'wiki.train.tokens') with open(file_name, 'r') as f: lines = f.readlines() # Uppercase letters are converted to lowercase ones paragraphs = [ line.strip().lower().split(' . ') for line in lines if len(line.split(' . ')) >= 2] random.shuffle(paragraphs) return paragraphs # Defined in file: ./chapter_natural-language-processing-pretraining/bert-dataset.md def _get_next_sentence(sentence, next_sentence, paragraphs): if random.random() < 0.5: is_next = True else: # `paragraphs` is a list of lists of lists next_sentence = random.choice(random.choice(paragraphs)) is_next = False return sentence, next_sentence, is_next # Defined in file: ./chapter_natural-language-processing-pretraining/bert-dataset.md def _get_nsp_data_from_paragraph(paragraph, paragraphs, vocab, max_len): nsp_data_from_paragraph = [] for i in range(len(paragraph) - 1): tokens_a, tokens_b, is_next = _get_next_sentence( paragraph[i], paragraph[i + 1], paragraphs) # Consider 1 '<cls>' token and 2 '<sep>' tokens if len(tokens_a) + len(tokens_b) + 3 > max_len: continue tokens, segments = d2l.get_tokens_and_segments(tokens_a, tokens_b) nsp_data_from_paragraph.append((tokens, segments, is_next)) return nsp_data_from_paragraph # Defined in file: ./chapter_natural-language-processing-pretraining/bert-dataset.md def _replace_mlm_tokens(tokens, candidate_pred_positions, num_mlm_preds, vocab): # Make a new copy of tokens for the input of a masked language model, # where the input may contain replaced '<mask>' or random tokens mlm_input_tokens = [token for token in tokens] pred_positions_and_labels = [] # Shuffle for getting 15% random tokens for prediction in the masked # language modeling task random.shuffle(candidate_pred_positions) for mlm_pred_position in candidate_pred_positions: if len(pred_positions_and_labels) >= num_mlm_preds: break masked_token = None # 80% of the time: replace the word with the '<mask>' token if random.random() < 0.8: masked_token = '<mask>' else: # 10% of the time: keep the word unchanged if random.random() < 0.5: masked_token = tokens[mlm_pred_position] # 10% of the time: replace the word with a random word else: masked_token = random.randint(0, len(vocab) - 1) mlm_input_tokens[mlm_pred_position] = masked_token pred_positions_and_labels.append( (mlm_pred_position, tokens[mlm_pred_position])) return mlm_input_tokens, pred_positions_and_labels # Defined in file: ./chapter_natural-language-processing-pretraining/bert-dataset.md def _get_mlm_data_from_tokens(tokens, vocab): candidate_pred_positions = [] # `tokens` is a list of strings for i, token in enumerate(tokens): # Special tokens are not predicted in the masked language modeling # task if token in ['<cls>', '<sep>']: continue candidate_pred_positions.append(i) # 15% of random tokens are predicted in the masked language modeling task num_mlm_preds = max(1, round(len(tokens) * 0.15)) mlm_input_tokens, pred_positions_and_labels = _replace_mlm_tokens( tokens, candidate_pred_positions, num_mlm_preds, vocab) pred_positions_and_labels = sorted(pred_positions_and_labels, key=lambda x: x[0]) pred_positions = [v[0] for v in pred_positions_and_labels] mlm_pred_labels = [v[1] for v in pred_positions_and_labels] return vocab[mlm_input_tokens], pred_positions, vocab[mlm_pred_labels] # Defined in file: ./chapter_natural-language-processing-pretraining/bert-dataset.md def _pad_bert_inputs(examples, max_len, vocab): max_num_mlm_preds = round(max_len * 0.15) all_token_ids, all_segments, valid_lens, = [], [], [] all_pred_positions, all_mlm_weights, all_mlm_labels = [], [], [] nsp_labels = [] for (token_ids, pred_positions, mlm_pred_label_ids, segments, is_next) in examples: all_token_ids.append( torch.tensor( token_ids + [vocab['<pad>']] * (max_len - len(token_ids)), dtype=torch.long)) all_segments.append( torch.tensor(segments + [0] * (max_len - len(segments)), dtype=torch.long)) # `valid_lens` excludes count of '<pad>' tokens valid_lens.append(torch.tensor(len(token_ids), dtype=torch.float32)) all_pred_positions.append( torch.tensor( pred_positions + [0] * (max_num_mlm_preds - len(pred_positions)), dtype=torch.long)) # Predictions of padded tokens will be filtered out in the loss via # multiplication of 0 weights all_mlm_weights.append( torch.tensor([1.0] * len(mlm_pred_label_ids) + [0.0] * (max_num_mlm_preds - len(pred_positions)), dtype=torch.float32)) all_mlm_labels.append( torch.tensor( mlm_pred_label_ids + [0] * (max_num_mlm_preds - len(mlm_pred_label_ids)), dtype=torch.long)) nsp_labels.append(torch.tensor(is_next, dtype=torch.long)) return (all_token_ids, all_segments, valid_lens, all_pred_positions, all_mlm_weights, all_mlm_labels, nsp_labels) # Defined in file: ./chapter_natural-language-processing-pretraining/bert-dataset.md class _WikiTextDataset(torch.utils.data.Dataset): def __init__(self, paragraphs, max_len): # Input `paragraphs[i]` is a list of sentence strings representing a # paragraph; while output `paragraphs[i]` is a list of sentences # representing a paragraph, where each sentence is a list of tokens paragraphs = [ d2l.tokenize(paragraph, token='word') for paragraph in paragraphs] sentences = [ sentence for paragraph in paragraphs for sentence in paragraph] self.vocab = d2l.Vocab( sentences, min_freq=5, reserved_tokens=['<pad>', '<mask>', '<cls>', '<sep>']) # Get data for the next sentence prediction task examples = [] for paragraph in paragraphs: examples.extend( _get_nsp_data_from_paragraph(paragraph, paragraphs, self.vocab, max_len)) # Get data for the masked language model task examples = [(_get_mlm_data_from_tokens(tokens, self.vocab) + (segments, is_next)) for tokens, segments, is_next in examples] # Pad inputs (self.all_token_ids, self.all_segments, self.valid_lens, self.all_pred_positions, self.all_mlm_weights, self.all_mlm_labels, self.nsp_labels) = _pad_bert_inputs(examples, max_len, self.vocab) def __getitem__(self, idx): return (self.all_token_ids[idx], self.all_segments[idx], self.valid_lens[idx], self.all_pred_positions[idx], self.all_mlm_weights[idx], self.all_mlm_labels[idx], self.nsp_labels[idx]) def __len__(self): return len(self.all_token_ids) # Defined in file: ./chapter_natural-language-processing-pretraining/bert-dataset.md
[docs]def load_data_wiki(batch_size, max_len): num_workers = d2l.get_dataloader_workers() data_dir = d2l.download_extract('wikitext-2', 'wikitext-2') paragraphs = _read_wiki(data_dir) train_set = _WikiTextDataset(paragraphs, max_len) train_iter = torch.utils.data.DataLoader(train_set, batch_size, shuffle=True, num_workers=num_workers) return train_iter, train_set.vocab
# Defined in file: ./chapter_natural-language-processing-pretraining/bert-pretraining.md def _get_batch_loss_bert(net, loss, vocab_size, tokens_X, segments_X, valid_lens_x, pred_positions_X, mlm_weights_X, mlm_Y, nsp_y): # Forward pass _, mlm_Y_hat, nsp_Y_hat = net(tokens_X, segments_X, valid_lens_x.reshape(-1), pred_positions_X) # Compute masked language model loss mlm_l = loss(mlm_Y_hat.reshape(-1, vocab_size), mlm_Y.reshape(-1)) *\ mlm_weights_X.reshape(-1, 1) mlm_l = mlm_l.sum() / (mlm_weights_X.sum() + 1e-8) # Compute next sentence prediction loss nsp_l = loss(nsp_Y_hat, nsp_y) l = mlm_l + nsp_l return mlm_l, nsp_l, l # Defined in file: ./chapter_natural-language-processing-applications/sentiment-analysis-and-dataset.md d2l.DATA_HUB['aclImdb'] = ( 'http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz', '01ada507287d82875905620988597833ad4e0903') # Defined in file: ./chapter_natural-language-processing-applications/sentiment-analysis-and-dataset.md
[docs]def read_imdb(data_dir, is_train): data, labels = [], [] for label in ('pos', 'neg'): folder_name = os.path.join(data_dir, 'train' if is_train else 'test', label) for file in os.listdir(folder_name): with open(os.path.join(folder_name, file), 'rb') as f: review = f.read().decode('utf-8').replace('\n', '') data.append(review) labels.append(1 if label == 'pos' else 0) return data, labels
# Defined in file: ./chapter_natural-language-processing-applications/sentiment-analysis-and-dataset.md
[docs]def load_data_imdb(batch_size, num_steps=500): data_dir = d2l.download_extract('aclImdb', 'aclImdb') train_data = read_imdb(data_dir, True) test_data = read_imdb(data_dir, False) train_tokens = d2l.tokenize(train_data[0], token='word') test_tokens = d2l.tokenize(test_data[0], token='word') vocab = d2l.Vocab(train_tokens, min_freq=5) train_features = torch.tensor([ d2l.truncate_pad(vocab[line], num_steps, vocab['<pad>']) for line in train_tokens]) test_features = torch.tensor([ d2l.truncate_pad(vocab[line], num_steps, vocab['<pad>']) for line in test_tokens]) train_iter = d2l.load_array((train_features, torch.tensor(train_data[1])), batch_size) test_iter = d2l.load_array((test_features, torch.tensor(test_data[1])), batch_size, is_train=False) return train_iter, test_iter, vocab
# Defined in file: ./chapter_natural-language-processing-applications/sentiment-analysis-rnn.md
[docs]def predict_sentiment(net, vocab, sentence): sentence = torch.tensor(vocab[sentence.split()], device=d2l.try_gpu()) label = torch.argmax(net(sentence.reshape(1, -1)), dim=1) return 'positive' if label == 1 else 'negative'
# Defined in file: ./chapter_natural-language-processing-applications/natural-language-inference-and-dataset.md d2l.DATA_HUB['SNLI'] = ('https://nlp.stanford.edu/projects/snli/snli_1.0.zip', '9fcde07509c7e87ec61c640c1b2753d9041758e4') # Defined in file: ./chapter_natural-language-processing-applications/natural-language-inference-and-dataset.md
[docs]def read_snli(data_dir, is_train): """Read the SNLI dataset into premises, hypotheses, and labels.""" def extract_text(s): # Remove information that will not be used by us s = re.sub('\\(', '', s) s = re.sub('\\)', '', s) # Substitute two or more consecutive whitespace with space s = re.sub('\\s{2,}', ' ', s) return s.strip() label_set = {'entailment': 0, 'contradiction': 1, 'neutral': 2} file_name = os.path.join( data_dir, 'snli_1.0_train.txt' if is_train else 'snli_1.0_test.txt') with open(file_name, 'r') as f: rows = [row.split('\t') for row in f.readlines()[1:]] premises = [extract_text(row[1]) for row in rows if row[0] in label_set] hypotheses = [extract_text(row[2]) for row in rows if row[0] in label_set] labels = [label_set[row[0]] for row in rows if row[0] in label_set] return premises, hypotheses, labels
# Defined in file: ./chapter_natural-language-processing-applications/natural-language-inference-and-dataset.md
[docs]class SNLIDataset(torch.utils.data.Dataset): """A customized dataset to load the SNLI dataset.""" def __init__(self, dataset, num_steps, vocab=None): self.num_steps = num_steps all_premise_tokens = d2l.tokenize(dataset[0]) all_hypothesis_tokens = d2l.tokenize(dataset[1]) if vocab is None: self.vocab = d2l.Vocab(all_premise_tokens + all_hypothesis_tokens, min_freq=5, reserved_tokens=['<pad>']) else: self.vocab = vocab self.premises = self._pad(all_premise_tokens) self.hypotheses = self._pad(all_hypothesis_tokens) self.labels = torch.tensor(dataset[2]) print('read ' + str(len(self.premises)) + ' examples') def _pad(self, lines): return torch.tensor([ d2l.truncate_pad(self.vocab[line], self.num_steps, self.vocab['<pad>']) for line in lines]) def __getitem__(self, idx): return (self.premises[idx], self.hypotheses[idx]), self.labels[idx] def __len__(self): return len(self.premises)
# Defined in file: ./chapter_natural-language-processing-applications/natural-language-inference-and-dataset.md
[docs]def load_data_snli(batch_size, num_steps=50): """Download the SNLI dataset and return data iterators and vocabulary.""" num_workers = d2l.get_dataloader_workers() data_dir = d2l.download_extract('SNLI') train_data = read_snli(data_dir, True) test_data = read_snli(data_dir, False) train_set = SNLIDataset(train_data, num_steps) test_set = SNLIDataset(test_data, num_steps, train_set.vocab) train_iter = torch.utils.data.DataLoader(train_set, batch_size, shuffle=True, num_workers=num_workers) test_iter = torch.utils.data.DataLoader(test_set, batch_size, shuffle=False, num_workers=num_workers) return train_iter, test_iter, train_set.vocab
# Defined in file: ./chapter_natural-language-processing-applications/natural-language-inference-attention.md
[docs]def predict_snli(net, vocab, premise, hypothesis): net.eval() premise = torch.tensor(vocab[premise], device=d2l.try_gpu()) hypothesis = torch.tensor(vocab[hypothesis], device=d2l.try_gpu()) label = torch.argmax( net([premise.reshape((1, -1)), hypothesis.reshape((1, -1))]), dim=1) return 'entailment' if label == 0 else 'contradiction' if label == 1 \ else 'neutral'
# Defined in file: ./chapter_generative-adversarial-networks/gan.md
[docs]def update_D(X, Z, net_D, net_G, loss, trainer_D): """Update discriminator.""" batch_size = X.shape[0] ones = torch.ones((batch_size,), device=X.device) zeros = torch.zeros((batch_size,), device=X.device) trainer_D.zero_grad() real_Y = net_D(X) fake_X = net_G(Z) # Do not need to compute gradient for `net_G`, detach it from # computing gradients. fake_Y = net_D(fake_X.detach()) loss_D = (loss(real_Y, ones.reshape(real_Y.shape)) + loss(fake_Y, zeros.reshape(fake_Y.shape))) / 2 loss_D.backward() trainer_D.step() return loss_D
# Defined in file: ./chapter_generative-adversarial-networks/gan.md
[docs]def update_G(Z, net_D, net_G, loss, trainer_G): """Update generator.""" batch_size = Z.shape[0] ones = torch.ones((batch_size,), device=Z.device) trainer_G.zero_grad() # We could reuse `fake_X` from `update_D` to save computation fake_X = net_G(Z) # Recomputing `fake_Y` is needed since `net_D` is changed fake_Y = net_D(fake_X) loss_G = loss(fake_Y, ones.reshape(fake_Y.shape)) loss_G.backward() trainer_G.step() return loss_G
# Defined in file: ./chapter_generative-adversarial-networks/dcgan.md d2l.DATA_HUB['pokemon'] = (d2l.DATA_URL + 'pokemon.zip', 'c065c0e2593b8b161a2d7873e42418bf6a21106c') # Alias defined in config.ini ones = torch.ones zeros = torch.zeros tensor = torch.tensor arange = torch.arange meshgrid = torch.meshgrid sin = torch.sin sinh = torch.sinh cos = torch.cos cosh = torch.cosh tanh = torch.tanh linspace = torch.linspace exp = torch.exp log = torch.log normal = torch.normal rand = torch.rand matmul = torch.matmul int32 = torch.int32 float32 = torch.float32 concat = torch.cat stack = torch.stack abs = torch.abs eye = torch.eye numpy = lambda x, *args, **kwargs: x.detach().numpy(*args, **kwargs) size = lambda x, *args, **kwargs: x.numel(*args, **kwargs) reshape = lambda x, *args, **kwargs: x.reshape(*args, **kwargs) to = lambda x, *args, **kwargs: x.to(*args, **kwargs) reduce_sum = lambda x, *args, **kwargs: x.sum(*args, **kwargs) argmax = lambda x, *args, **kwargs: x.argmax(*args, **kwargs) astype = lambda x, *args, **kwargs: x.type(*args, **kwargs) transpose = lambda x, *args, **kwargs: x.t(*args, **kwargs)