Source code for d2l.tensorflow
# This file is generated automatically through:
# d2lbook build lib
# Don't edit it directly
# Defined in file: ./chapter_preface/index.md
import collections
import hashlib
import math
import os
import random
import re
import shutil
import sys
import tarfile
import time
import zipfile
from collections import defaultdict
import pandas as pd
import requests
from IPython import display
from matplotlib import pyplot as plt
d2l = sys.modules[__name__]
# Defined in file: ./chapter_preface/index.md
import numpy as np
import tensorflow as tf
# Defined in file: ./chapter_preliminaries/calculus.md
[docs]def use_svg_display():
"""Use the svg format to display a plot in Jupyter."""
display.set_matplotlib_formats('svg')
# Defined in file: ./chapter_preliminaries/calculus.md
[docs]def set_figsize(figsize=(3.5, 2.5)):
"""Set the figure size for matplotlib."""
use_svg_display()
d2l.plt.rcParams['figure.figsize'] = figsize
# Defined in file: ./chapter_preliminaries/calculus.md
[docs]def set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend):
"""Set the axes for matplotlib."""
axes.set_xlabel(xlabel)
axes.set_ylabel(ylabel)
axes.set_xscale(xscale)
axes.set_yscale(yscale)
axes.set_xlim(xlim)
axes.set_ylim(ylim)
if legend:
axes.legend(legend)
axes.grid()
# Defined in file: ./chapter_preliminaries/calculus.md
[docs]def plot(X, Y=None, xlabel=None, ylabel=None, legend=None, xlim=None,
ylim=None, xscale='linear', yscale='linear',
fmts=('-', 'm--', 'g-.', 'r:'), figsize=(3.5, 2.5), axes=None):
"""Plot data points."""
if legend is None:
legend = []
set_figsize(figsize)
axes = axes if axes else d2l.plt.gca()
# Return True if `X` (tensor or list) has 1 axis
def has_one_axis(X):
return (hasattr(X, "ndim") and X.ndim == 1 or
isinstance(X, list) and not hasattr(X[0], "__len__"))
if has_one_axis(X):
X = [X]
if Y is None:
X, Y = [[]] * len(X), X
elif has_one_axis(Y):
Y = [Y]
if len(X) != len(Y):
X = X * len(Y)
axes.cla()
for x, y, fmt in zip(X, Y, fmts):
if len(x):
axes.plot(x, y, fmt)
else:
axes.plot(y, fmt)
set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
# Defined in file: ./chapter_linear-networks/linear-regression.md
[docs]class Timer:
"""Record multiple running times."""
def __init__(self):
self.times = []
self.start()
[docs] def stop(self):
"""Stop the timer and record the time in a list."""
self.times.append(time.time() - self.tik)
return self.times[-1]
[docs] def cumsum(self):
"""Return the accumulated time."""
return np.array(self.times).cumsum().tolist()
# Defined in file: ./chapter_linear-networks/linear-regression-scratch.md
[docs]def synthetic_data(w, b, num_examples):
"""Generate y = Xw + b + noise."""
X = d2l.zeros((num_examples, w.shape[0]))
X += tf.random.normal(shape=X.shape)
y = d2l.matmul(X, tf.reshape(w, (-1, 1))) + b
y += tf.random.normal(shape=y.shape, stddev=0.01)
y = d2l.reshape(y, (-1, 1))
return X, y
# Defined in file: ./chapter_linear-networks/linear-regression-scratch.md
# Defined in file: ./chapter_linear-networks/linear-regression-scratch.md
[docs]def squared_loss(y_hat, y):
"""Squared loss."""
return (y_hat - d2l.reshape(y, y_hat.shape))**2 / 2
# Defined in file: ./chapter_linear-networks/linear-regression-scratch.md
[docs]def sgd(params, grads, lr, batch_size):
"""Minibatch stochastic gradient descent."""
for param, grad in zip(params, grads):
param.assign_sub(lr * grad / batch_size)
# Defined in file: ./chapter_linear-networks/linear-regression-concise.md
[docs]def load_array(data_arrays, batch_size, is_train=True):
"""Construct a TensorFlow data iterator."""
dataset = tf.data.Dataset.from_tensor_slices(data_arrays)
if is_train:
dataset = dataset.shuffle(buffer_size=1000)
dataset = dataset.batch(batch_size)
return dataset
# Defined in file: ./chapter_linear-networks/image-classification-dataset.md
[docs]def get_fashion_mnist_labels(labels):
"""Return text labels for the Fashion-MNIST dataset."""
text_labels = [
't-shirt', 'trouser', 'pullover', 'dress', 'coat', 'sandal', 'shirt',
'sneaker', 'bag', 'ankle boot']
return [text_labels[int(i)] for i in labels]
# Defined in file: ./chapter_linear-networks/image-classification-dataset.md
[docs]def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5):
"""Plot a list of images."""
figsize = (num_cols * scale, num_rows * scale)
_, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize)
axes = axes.flatten()
for i, (ax, img) in enumerate(zip(axes, imgs)):
ax.imshow(d2l.numpy(img))
ax.axes.get_xaxis().set_visible(False)
ax.axes.get_yaxis().set_visible(False)
if titles:
ax.set_title(titles[i])
return axes
# Defined in file: ./chapter_linear-networks/image-classification-dataset.md
[docs]def load_data_fashion_mnist(batch_size, resize=None):
"""Download the Fashion-MNIST dataset and then load it into memory."""
mnist_train, mnist_test = tf.keras.datasets.fashion_mnist.load_data()
# Divide all numbers by 255 so that all pixel values are between
# 0 and 1, add a batch dimension at the last. And cast label to int32
process = lambda X, y: (tf.expand_dims(X, axis=3) / 255,
tf.cast(y, dtype='int32'))
resize_fn = lambda X, y: (tf.image.resize_with_pad(X, resize, resize)
if resize else X, y)
return (tf.data.Dataset.from_tensor_slices(
process(*mnist_train)).batch(batch_size).shuffle(len(
mnist_train[0])).map(resize_fn),
tf.data.Dataset.from_tensor_slices(
process(*mnist_test)).batch(batch_size).map(resize_fn))
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def accuracy(y_hat, y):
"""Compute the number of correct predictions."""
if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:
y_hat = d2l.argmax(y_hat, axis=1)
cmp = d2l.astype(y_hat, y.dtype) == y
return float(d2l.reduce_sum(d2l.astype(cmp, y.dtype)))
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def evaluate_accuracy(net, data_iter):
"""Compute the accuracy for a model on a dataset."""
metric = Accumulator(2) # No. of correct predictions, no. of predictions
for X, y in data_iter:
metric.add(accuracy(net(X), y), d2l.size(y))
return metric[0] / metric[1]
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]class Accumulator:
"""For accumulating sums over `n` variables."""
def __init__(self, n):
self.data = [0.0] * n
def __getitem__(self, idx):
return self.data[idx]
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def train_epoch_ch3(net, train_iter, loss, updater):
"""The training loop defined in Chapter 3."""
# Sum of training loss, sum of training accuracy, no. of examples
metric = Accumulator(3)
for X, y in train_iter:
# Compute gradients and update parameters
with tf.GradientTape() as tape:
y_hat = net(X)
# Keras implementations for loss takes (labels, predictions)
# instead of (predictions, labels) that users might implement
# in this book, e.g. `cross_entropy` that we implemented above
if isinstance(loss, tf.keras.losses.Loss):
l = loss(y, y_hat)
else:
l = loss(y_hat, y)
if isinstance(updater, tf.keras.optimizers.Optimizer):
params = net.trainable_variables
grads = tape.gradient(l, params)
updater.apply_gradients(zip(grads, params))
else:
updater(X.shape[0], tape.gradient(l, updater.params))
# Keras loss by default returns the average loss in a batch
l_sum = l * float(tf.size(y)) if isinstance(
loss, tf.keras.losses.Loss) else tf.reduce_sum(l)
metric.add(l_sum, accuracy(y_hat, y), tf.size(y))
# Return training loss and training accuracy
return metric[0] / metric[2], metric[1] / metric[2]
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]class Animator:
"""For plotting data in animation."""
def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None,
ylim=None, xscale='linear', yscale='linear',
fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1,
figsize=(3.5, 2.5)):
# Incrementally plot multiple lines
if legend is None:
legend = []
d2l.use_svg_display()
self.fig, self.axes = d2l.plt.subplots(nrows, ncols, figsize=figsize)
if nrows * ncols == 1:
self.axes = [self.axes,]
# Use a lambda function to capture arguments
self.config_axes = lambda: d2l.set_axes(self.axes[
0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
self.X, self.Y, self.fmts = None, None, fmts
[docs] def add(self, x, y):
# Add multiple data points into the figure
if not hasattr(y, "__len__"):
y = [y]
n = len(y)
if not hasattr(x, "__len__"):
x = [x] * n
if not self.X:
self.X = [[] for _ in range(n)]
if not self.Y:
self.Y = [[] for _ in range(n)]
for i, (a, b) in enumerate(zip(x, y)):
if a is not None and b is not None:
self.X[i].append(a)
self.Y[i].append(b)
self.axes[0].cla()
for x, y, fmt in zip(self.X, self.Y, self.fmts):
self.axes[0].plot(x, y, fmt)
self.config_axes()
display.display(self.fig)
display.clear_output(wait=True)
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def train_ch3(net, train_iter, test_iter, loss, num_epochs, updater):
"""Train a model (defined in Chapter 3)."""
animator = Animator(xlabel='epoch', xlim=[1, num_epochs], ylim=[0.3, 0.9],
legend=['train loss', 'train acc', 'test acc'])
for epoch in range(num_epochs):
train_metrics = train_epoch_ch3(net, train_iter, loss, updater)
test_acc = evaluate_accuracy(net, test_iter)
animator.add(epoch + 1, train_metrics + (test_acc,))
train_loss, train_acc = train_metrics
assert train_loss < 0.5, train_loss
assert train_acc <= 1 and train_acc > 0.7, train_acc
assert test_acc <= 1 and test_acc > 0.7, test_acc
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]class Updater():
"""For updating parameters using minibatch stochastic gradient descent."""
def __init__(self, params, lr):
self.params = params
self.lr = lr
def __call__(self, batch_size, grads):
d2l.sgd(self.params, grads, self.lr, batch_size)
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def predict_ch3(net, test_iter, n=6):
"""Predict labels (defined in Chapter 3)."""
for X, y in test_iter:
break
trues = d2l.get_fashion_mnist_labels(y)
preds = d2l.get_fashion_mnist_labels(d2l.argmax(net(X), axis=1))
titles = [true + '\n' + pred for true, pred in zip(trues, preds)]
d2l.show_images(d2l.reshape(X[0:n], (n, 28, 28)), 1, n,
titles=titles[0:n])
# Defined in file: ./chapter_multilayer-perceptrons/underfit-overfit.md
[docs]def evaluate_loss(net, data_iter, loss):
"""Evaluate the loss of a model on the given dataset."""
metric = d2l.Accumulator(2) # Sum of losses, no. of examples
for X, y in data_iter:
l = loss(net(X), y)
metric.add(d2l.reduce_sum(l), d2l.size(l))
return metric[0] / metric[1]
# Defined in file: ./chapter_multilayer-perceptrons/kaggle-house-price.md
DATA_HUB = dict()
DATA_URL = 'http://d2l-data.s3-accelerate.amazonaws.com/'
# Defined in file: ./chapter_multilayer-perceptrons/kaggle-house-price.md
[docs]def download(name, cache_dir=os.path.join('..', 'data')):
"""Download a file inserted into DATA_HUB, return the local filename."""
assert name in DATA_HUB, f"{name} does not exist in {DATA_HUB}."
url, sha1_hash = DATA_HUB[name]
os.makedirs(cache_dir, exist_ok=True)
fname = os.path.join(cache_dir, url.split('/')[-1])
if os.path.exists(fname):
sha1 = hashlib.sha1()
with open(fname, 'rb') as f:
while True:
data = f.read(1048576)
if not data:
break
sha1.update(data)
if sha1.hexdigest() == sha1_hash:
return fname # Hit cache
print(f'Downloading {fname} from {url}...')
r = requests.get(url, stream=True, verify=True)
with open(fname, 'wb') as f:
f.write(r.content)
return fname
# Defined in file: ./chapter_multilayer-perceptrons/kaggle-house-price.md
[docs]def download_extract(name, folder=None):
"""Download and extract a zip/tar file."""
fname = download(name)
base_dir = os.path.dirname(fname)
data_dir, ext = os.path.splitext(fname)
if ext == '.zip':
fp = zipfile.ZipFile(fname, 'r')
elif ext in ('.tar', '.gz'):
fp = tarfile.open(fname, 'r')
else:
assert False, 'Only zip/tar files can be extracted.'
fp.extractall(base_dir)
return os.path.join(base_dir, folder) if folder else data_dir
[docs]def download_all():
"""Download all files in the DATA_HUB."""
for name in DATA_HUB:
download(name)
# Defined in file: ./chapter_multilayer-perceptrons/kaggle-house-price.md
DATA_HUB['kaggle_house_train'] = (DATA_URL + 'kaggle_house_pred_train.csv',
'585e9cc93e70b39160e7921475f9bcd7d31219ce')
DATA_HUB['kaggle_house_test'] = (DATA_URL + 'kaggle_house_pred_test.csv',
'fa19780a7b011d9b009e8bff8e99922a8ee2eb90')
# Defined in file: ./chapter_deep-learning-computation/use-gpu.md
[docs]def try_gpu(i=0):
"""Return gpu(i) if exists, otherwise return cpu()."""
if len(tf.config.experimental.list_physical_devices('GPU')) >= i + 1:
return tf.device(f'/GPU:{i}')
return tf.device('/CPU:0')
[docs]def try_all_gpus():
"""Return all available GPUs, or [cpu(),] if no GPU exists."""
num_gpus = len(tf.config.experimental.list_physical_devices('GPU'))
devices = [tf.device(f'/GPU:{i}') for i in range(num_gpus)]
return devices if devices else [tf.device('/CPU:0')]
# Defined in file: ./chapter_convolutional-neural-networks/conv-layer.md
[docs]def corr2d(X, K):
"""Compute 2D cross-correlation."""
h, w = K.shape
Y = tf.Variable(tf.zeros((X.shape[0] - h + 1, X.shape[1] - w + 1)))
for i in range(Y.shape[0]):
for j in range(Y.shape[1]):
Y[i, j].assign(tf.reduce_sum(X[i:i + h, j:j + w] * K))
return Y
# Defined in file: ./chapter_convolutional-neural-networks/lenet.md
[docs]class TrainCallback(tf.keras.callbacks.Callback):
"""A callback to visiualize the training progress."""
def __init__(self, net, train_iter, test_iter, num_epochs, device_name):
self.timer = d2l.Timer()
self.animator = d2l.Animator(
xlabel='epoch', xlim=[1, num_epochs],
legend=['train loss', 'train acc', 'test acc'])
self.net = net
self.train_iter = train_iter
self.test_iter = test_iter
self.num_epochs = num_epochs
self.device_name = device_name
[docs] def on_epoch_end(self, epoch, logs):
self.timer.stop()
test_acc = self.net.evaluate(self.test_iter, verbose=0,
return_dict=True)['accuracy']
metrics = (logs['loss'], logs['accuracy'], test_acc)
self.animator.add(epoch + 1, metrics)
if epoch == self.num_epochs - 1:
batch_size = next(iter(self.train_iter))[0].shape[0]
num_examples = batch_size * tf.data.experimental.cardinality(
self.train_iter).numpy()
print(f'loss {metrics[0]:.3f}, train acc {metrics[1]:.3f}, '
f'test acc {metrics[2]:.3f}')
print(f'{num_examples / self.timer.avg():.1f} examples/sec on '
f'{str(self.device_name)}')
[docs]def train_ch6(net_fn, train_iter, test_iter, num_epochs, lr, device):
"""Train a model with a GPU (defined in Chapter 6)."""
device_name = device._device_name
strategy = tf.distribute.OneDeviceStrategy(device_name)
with strategy.scope():
optimizer = tf.keras.optimizers.SGD(learning_rate=lr)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
net = net_fn()
net.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
callback = TrainCallback(net, train_iter, test_iter, num_epochs,
device_name)
net.fit(train_iter, epochs=num_epochs, verbose=0, callbacks=[callback])
return net
# Defined in file: ./chapter_convolutional-modern/resnet.md
[docs]class Residual(tf.keras.Model):
"""The Residual block of ResNet."""
def __init__(self, num_channels, use_1x1conv=False, strides=1):
super().__init__()
self.conv1 = tf.keras.layers.Conv2D(num_channels, padding='same',
kernel_size=3, strides=strides)
self.conv2 = tf.keras.layers.Conv2D(num_channels, kernel_size=3,
padding='same')
self.conv3 = None
if use_1x1conv:
self.conv3 = tf.keras.layers.Conv2D(num_channels, kernel_size=1,
strides=strides)
self.bn1 = tf.keras.layers.BatchNormalization()
self.bn2 = tf.keras.layers.BatchNormalization()
[docs] def call(self, X):
Y = tf.keras.activations.relu(self.bn1(self.conv1(X)))
Y = self.bn2(self.conv2(Y))
if self.conv3 is not None:
X = self.conv3(X)
Y += X
return tf.keras.activations.relu(Y)
# Defined in file: ./chapter_recurrent-neural-networks/text-preprocessing.md
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
'090b5e7e70c295757f55df93cb0a180b9691891a')
[docs]def read_time_machine():
"""Load the time machine dataset into a list of text lines."""
with open(d2l.download('time_machine'), 'r') as f:
lines = f.readlines()
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
# Defined in file: ./chapter_recurrent-neural-networks/text-preprocessing.md
[docs]def tokenize(lines, token='word'):
"""Split text lines into word or character tokens."""
if token == 'word':
return [line.split() for line in lines]
elif token == 'char':
return [list(line) for line in lines]
else:
print('ERROR: unknown token type: ' + token)
# Defined in file: ./chapter_recurrent-neural-networks/text-preprocessing.md
[docs]class Vocab:
"""Vocabulary for text."""
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# Sort according to frequencies
counter = count_corpus(tokens)
self.token_freqs = sorted(counter.items(), key=lambda x: x[1],
reverse=True)
# The index for the unknown token is 0
self.unk, uniq_tokens = 0, ['<unk>'] + reserved_tokens
uniq_tokens += [
token for token, freq in self.token_freqs
if freq >= min_freq and token not in uniq_tokens]
self.idx_to_token, self.token_to_idx = [], dict()
for token in uniq_tokens:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
def __len__(self):
return len(self.idx_to_token)
def __getitem__(self, tokens):
if not isinstance(tokens, (list, tuple)):
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]
[docs] def to_tokens(self, indices):
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
return [self.idx_to_token[index] for index in indices]
[docs]def count_corpus(tokens):
"""Count token frequencies."""
# Here `tokens` is a 1D list or 2D list
if len(tokens) == 0 or isinstance(tokens[0], list):
# Flatten a list of token lists into a list of tokens
tokens = [token for line in tokens for token in line]
return collections.Counter(tokens)
# Defined in file: ./chapter_recurrent-neural-networks/text-preprocessing.md
[docs]def load_corpus_time_machine(max_tokens=-1):
"""Return token indices and the vocabulary of the time machine dataset."""
lines = read_time_machine()
tokens = tokenize(lines, 'char')
vocab = Vocab(tokens)
# Since each text line in the time machine dataset is not necessarily a
# sentence or a paragraph, flatten all the text lines into a single list
corpus = [vocab[token] for line in tokens for token in line]
if max_tokens > 0:
corpus = corpus[:max_tokens]
return corpus, vocab
# Defined in file: ./chapter_recurrent-neural-networks/language-models-and-dataset.md
[docs]def seq_data_iter_random(corpus, batch_size, num_steps):
"""Generate a minibatch of subsequences using random sampling."""
# Start with a random offset (inclusive of `num_steps - 1`) to partition a
# sequence
corpus = corpus[random.randint(0, num_steps - 1):]
# Subtract 1 since we need to account for labels
num_subseqs = (len(corpus) - 1) // num_steps
# The starting indices for subsequences of length `num_steps`
initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
# In random sampling, the subsequences from two adjacent random
# minibatches during iteration are not necessarily adjacent on the
# original sequence
random.shuffle(initial_indices)
def data(pos):
# Return a sequence of length `num_steps` starting from `pos`
return corpus[pos:pos + num_steps]
num_batches = num_subseqs // batch_size
for i in range(0, batch_size * num_batches, batch_size):
# Here, `initial_indices` contains randomized starting indices for
# subsequences
initial_indices_per_batch = initial_indices[i:i + batch_size]
X = [data(j) for j in initial_indices_per_batch]
Y = [data(j + 1) for j in initial_indices_per_batch]
yield d2l.tensor(X), d2l.tensor(Y)
# Defined in file: ./chapter_recurrent-neural-networks/language-models-and-dataset.md
[docs]def seq_data_iter_sequential(corpus, batch_size, num_steps):
"""Generate a minibatch of subsequences using sequential partitioning."""
# Start with a random offset to partition a sequence
offset = random.randint(0, num_steps)
num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
Xs = d2l.tensor(corpus[offset:offset + num_tokens])
Ys = d2l.tensor(corpus[offset + 1:offset + 1 + num_tokens])
Xs = d2l.reshape(Xs, (batch_size, -1))
Ys = d2l.reshape(Ys, (batch_size, -1))
num_batches = Xs.shape[1] // num_steps
for i in range(0, num_batches * num_steps, num_steps):
X = Xs[:, i:i + num_steps]
Y = Ys[:, i:i + num_steps]
yield X, Y
# Defined in file: ./chapter_recurrent-neural-networks/language-models-and-dataset.md
[docs]class SeqDataLoader:
"""An iterator to load sequence data."""
def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
if use_random_iter:
self.data_iter_fn = d2l.seq_data_iter_random
else:
self.data_iter_fn = d2l.seq_data_iter_sequential
self.corpus, self.vocab = d2l.load_corpus_time_machine(max_tokens)
self.batch_size, self.num_steps = batch_size, num_steps
def __iter__(self):
return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)
# Defined in file: ./chapter_recurrent-neural-networks/language-models-and-dataset.md
[docs]def load_data_time_machine(batch_size, num_steps, use_random_iter=False,
max_tokens=10000):
"""Return the iterator and the vocabulary of the time machine dataset."""
data_iter = SeqDataLoader(batch_size, num_steps, use_random_iter,
max_tokens)
return data_iter, data_iter.vocab
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]class RNNModelScratch:
"""A RNN Model implemented from scratch."""
def __init__(self, vocab_size, num_hiddens, init_state, forward_fn):
self.vocab_size, self.num_hiddens = vocab_size, num_hiddens
self.init_state, self.forward_fn = init_state, forward_fn
def __call__(self, X, state, params):
X = tf.one_hot(tf.transpose(X), self.vocab_size)
X = tf.cast(X, tf.float32)
return self.forward_fn(X, state, params)
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]def predict_ch8(prefix, num_preds, net, vocab, params):
"""Generate new characters following the `prefix`."""
state = net.begin_state(batch_size=1)
outputs = [vocab[prefix[0]]]
get_input = lambda: d2l.reshape(d2l.tensor([outputs[-1]]), (1, 1)).numpy()
for y in prefix[1:]: # Warm-up period
_, state = net(get_input(), state, params)
outputs.append(vocab[y])
for _ in range(num_preds): # Predict `num_preds` steps
y, state = net(get_input(), state, params)
outputs.append(int(y.numpy().argmax(axis=1).reshape(1)))
return ''.join([vocab.idx_to_token[i] for i in outputs])
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]def grad_clipping(grads, theta):
"""Clip the gradient."""
theta = tf.constant(theta, dtype=tf.float32)
norm = tf.math.sqrt(
sum((tf.reduce_sum(grad**2)).numpy() for grad in grads))
norm = tf.cast(norm, tf.float32)
new_grad = []
if tf.greater(norm, theta):
for grad in grads:
new_grad.append(grad * theta / norm)
else:
for grad in grads:
new_grad.append(grad)
return new_grad
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]def train_epoch_ch8(net, train_iter, loss, updater, params, use_random_iter):
"""Train a model within one epoch (defined in Chapter 8)."""
state, timer = None, d2l.Timer()
metric = d2l.Accumulator(2) # Sum of training loss, no. of tokens
for X, Y in train_iter:
if state is None or use_random_iter:
# Initialize `state` when either it is the first iteration or
# using random sampling
state = net.begin_state(batch_size=X.shape[0])
with tf.GradientTape(persistent=True) as g:
g.watch(params)
y_hat, state = net(X, state, params)
y = d2l.reshape(tf.transpose(Y), (-1))
l = loss(y, y_hat)
grads = g.gradient(l, params)
grads = grad_clipping(grads, 1)
updater.apply_gradients(zip(grads, params))
# Keras loss by default returns the average loss in a batch
# l_sum = l * float(d2l.size(y)) if isinstance(
# loss, tf.keras.losses.Loss) else tf.reduce_sum(l)
metric.add(l * d2l.size(y), d2l.size(y))
return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]def train_ch8(net, train_iter, vocab, num_hiddens, lr, num_epochs, strategy,
use_random_iter=False):
"""Train a model (defined in Chapter 8)."""
with strategy.scope():
params = get_params(len(vocab), num_hiddens)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
updater = tf.keras.optimizers.SGD(lr)
animator = d2l.Animator(xlabel='epoch', ylabel='perplexity',
legend=['train'], xlim=[10, num_epochs])
predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, params)
# Train and predict
for epoch in range(num_epochs):
ppl, speed = train_epoch_ch8(net, train_iter, loss, updater, params,
use_random_iter)
if (epoch + 1) % 10 == 0:
print(predict('time traveller'))
animator.add(epoch + 1, [ppl])
device = d2l.try_gpu()._device_name
print(f'perplexity {ppl:.1f}, {speed:.1f} tokens/sec on {str(device)}')
print(predict('time traveller'))
print(predict('traveller'))
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip',
'94646ad1522d915e7b0f9296181140edcf86a4f5')
[docs]def read_data_nmt():
"""Load the English-French dataset."""
data_dir = d2l.download_extract('fra-eng')
with open(os.path.join(data_dir, 'fra.txt'), 'r') as f:
return f.read()
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def preprocess_nmt(text):
"""Preprocess the English-French dataset."""
def no_space(char, prev_char):
return char in set(',.!?') and prev_char != ' '
# Replace non-breaking space with space, and convert uppercase letters to
# lowercase ones
text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower()
# Insert space between words and punctuation marks
out = [
' ' + char if i > 0 and no_space(char, text[i - 1]) else char
for i, char in enumerate(text)]
return ''.join(out)
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def tokenize_nmt(text, num_examples=None):
"""Tokenize the English-French dataset."""
source, target = [], []
for i, line in enumerate(text.split('\n')):
if num_examples and i > num_examples:
break
parts = line.split('\t')
if len(parts) == 2:
source.append(parts[0].split(' '))
target.append(parts[1].split(' '))
return source, target
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def truncate_pad(line, num_steps, padding_token):
"""Truncate or pad sequences."""
if len(line) > num_steps:
return line[:num_steps] # Truncate
return line + [padding_token] * (num_steps - len(line)) # Pad
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def build_array_nmt(lines, vocab, num_steps):
"""Transform text sequences of machine translation into minibatches."""
lines = [vocab[l] for l in lines]
lines = [l + [vocab['<eos>']] for l in lines]
array = d2l.tensor([
truncate_pad(l, num_steps, vocab['<pad>']) for l in lines])
valid_len = d2l.reduce_sum(d2l.astype(array != vocab['<pad>'], d2l.int32),
1)
return array, valid_len
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def load_data_nmt(batch_size, num_steps, num_examples=600):
"""Return the iterator and the vocabularies of the translation dataset."""
text = preprocess_nmt(read_data_nmt())
source, target = tokenize_nmt(text, num_examples)
src_vocab = d2l.Vocab(source, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
tgt_vocab = d2l.Vocab(target, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps)
tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps)
data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len)
data_iter = d2l.load_array(data_arrays, batch_size)
return data_iter, src_vocab, tgt_vocab
# Defined in file: ./chapter_attention-mechanisms/attention-cues.md
[docs]def show_heatmaps(matrices, xlabel, ylabel, titles=None, figsize=(2.5, 2.5),
cmap='Reds'):
d2l.use_svg_display()
num_rows, num_cols = matrices.shape[0], matrices.shape[1]
fig, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize,
sharex=True, sharey=True, squeeze=False)
for i, (row_axes, row_matrices) in enumerate(zip(axes, matrices)):
for j, (ax, matrix) in enumerate(zip(row_axes, row_matrices)):
pcm = ax.imshow(d2l.numpy(matrix), cmap=cmap)
if i == num_rows - 1:
ax.set_xlabel(xlabel)
if j == 0:
ax.set_ylabel(ylabel)
if titles:
ax.set_title(titles[j])
fig.colorbar(pcm, ax=axes, shrink=0.6)
# Defined in file: ./chapter_optimization/optimization-intro.md
[docs]def annotate(text, xy, xytext):
d2l.plt.gca().annotate(text, xy=xy, xytext=xytext,
arrowprops=dict(arrowstyle='->'))
# Defined in file: ./chapter_optimization/gd.md
[docs]def train_2d(trainer, steps=20):
"""Optimize a 2-dim objective function with a customized trainer."""
# s1 and s2 are internal state variables and will
# be used later in the chapter
x1, x2, s1, s2 = -5, -2, 0, 0
results = [(x1, x2)]
for i in range(steps):
x1, x2, s1, s2 = trainer(x1, x2, s1, s2)
results.append((x1, x2))
return results
[docs]def show_trace_2d(f, results):
"""Show the trace of 2D variables during optimization."""
d2l.set_figsize()
d2l.plt.plot(*zip(*results), '-o', color='#ff7f0e')
x1, x2 = d2l.meshgrid(d2l.arange(-5.5, 1.0, 0.1),
d2l.arange(-3.0, 1.0, 0.1))
d2l.plt.contour(x1, x2, f(x1, x2), colors='#1f77b4')
d2l.plt.xlabel('x1')
d2l.plt.ylabel('x2')
# Defined in file: ./chapter_optimization/minibatch-sgd.md
d2l.DATA_HUB['airfoil'] = (d2l.DATA_URL + 'airfoil_self_noise.dat',
'76e5be1548fd8222e5074cf0faae75edff8cf93f')
[docs]def get_data_ch11(batch_size=10, n=1500):
data = np.genfromtxt(d2l.download('airfoil'), dtype=np.float32,
delimiter='\t')
data = (data - data.mean(axis=0)) / data.std(axis=0)
data_iter = d2l.load_array((data[:n, :-1], data[:n, -1]), batch_size,
is_train=True)
return data_iter, data.shape[1] - 1
# Defined in file: ./chapter_optimization/minibatch-sgd.md
[docs]def train_ch11(trainer_fn, states, hyperparams, data_iter, feature_dim,
num_epochs=2):
# Initialization
w = tf.Variable(
tf.random.normal(shape=(feature_dim, 1), mean=0, stddev=0.01),
trainable=True)
b = tf.Variable(tf.zeros(1), trainable=True)
# Train
net, loss = lambda X: d2l.linreg(X, w, b), d2l.squared_loss
animator = d2l.Animator(xlabel='epoch', ylabel='loss',
xlim=[0, num_epochs], ylim=[0.22, 0.35])
n, timer = 0, d2l.Timer()
for _ in range(num_epochs):
for X, y in data_iter:
with tf.GradientTape() as g:
l = tf.math.reduce_mean(loss(net(X), y))
dw, db = g.gradient(l, [w, b])
trainer_fn([w, b], [dw, db], states, hyperparams)
n += X.shape[0]
if n % 200 == 0:
timer.stop()
p = n / X.shape[0]
q = p / tf.data.experimental.cardinality(data_iter).numpy()
r = (d2l.evaluate_loss(net, data_iter, loss),)
animator.add(q, r)
timer.start()
print(f'loss: {animator.Y[0][-1]:.3f}, {timer.avg():.3f} sec/epoch')
return timer.cumsum(), animator.Y[0]
# Defined in file: ./chapter_optimization/minibatch-sgd.md
[docs]def train_concise_ch11(trainer_fn, hyperparams, data_iter, num_epochs=2):
# Initialization
net = tf.keras.Sequential()
net.add(
tf.keras.layers.Dense(
1, kernel_initializer=tf.random_normal_initializer(stddev=0.01)))
optimizer = trainer_fn(**hyperparams)
loss = tf.keras.losses.MeanSquaredError()
# Note: L2 Loss = 1/2 * MSE Loss. TensorFlow has MSE Loss which is
# slightly different from MXNet's L2Loss by a factor of 2. Hence we halve
# the loss value to get L2Loss in TensorFlow
animator = d2l.Animator(xlabel='epoch', ylabel='loss',
xlim=[0, num_epochs], ylim=[0.22, 0.35])
n, timer = 0, d2l.Timer()
for _ in range(num_epochs):
for X, y in data_iter:
with tf.GradientTape() as g:
out = net(X)
l = loss(y, out) / 2
params = net.trainable_variables
grads = g.gradient(l, params)
optimizer.apply_gradients(zip(grads, params))
n += X.shape[0]
if n % 200 == 0:
timer.stop()
p = n / X.shape[0]
q = p / tf.data.experimental.cardinality(data_iter).numpy()
r = (d2l.evaluate_loss(net, data_iter, loss) / 2,)
animator.add(q, r)
timer.start()
print(f'loss: {animator.Y[0][-1]:.3f}, {timer.avg():.3f} sec/epoch')
# Defined in file: ./chapter_computer-vision/bounding-box.md
[docs]def box_corner_to_center(boxes):
"""Convert from (upper_left, bottom_right) to (center, width, height)"""
x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3]
cx = (x1 + x2) / 2
cy = (y1 + y2) / 2
w = x2 - x1
h = y2 - y1
boxes = d2l.stack((cx, cy, w, h), axis=-1)
return boxes
[docs]def box_center_to_corner(boxes):
"""Convert from (center, width, height) to (upper_left, bottom_right)"""
cx, cy, w, h = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3]
x1 = cx - 0.5 * w
y1 = cy - 0.5 * h
x2 = cx + 0.5 * w
y2 = cy + 0.5 * h
boxes = d2l.stack((x1, y1, x2, y2), axis=-1)
return boxes
# Defined in file: ./chapter_computer-vision/bounding-box.md
[docs]def bbox_to_rect(bbox, color):
"""Convert bounding box to matplotlib format."""
# Convert the bounding box (top-left x, top-left y, bottom-right x,
# bottom-right y) format to matplotlib format: ((upper-left x,
# upper-left y), width, height)
return d2l.plt.Rectangle(xy=(bbox[0], bbox[1]), width=bbox[2] - bbox[0],
height=bbox[3] - bbox[1], fill=False,
edgecolor=color, linewidth=2)
# Alias defined in config.ini
size = lambda a: tf.size(a).numpy()
reshape = tf.reshape
ones = tf.ones
zeros = tf.zeros
meshgrid = tf.meshgrid
sin = tf.sin
sinh = tf.sinh
cos = tf.cos
cosh = tf.cosh
tanh = tf.tanh
linspace = tf.linspace
exp = tf.exp
normal = tf.random.normal
rand = tf.random.uniform
matmul = tf.matmul
reduce_sum = tf.reduce_sum
argmax = tf.argmax
tensor = tf.constant
arange = tf.range
astype = tf.cast
int32 = tf.int32
float32 = tf.float32
transpose = tf.transpose
concat = tf.concat
stack = tf.stack
abs = tf.abs
eye = tf.eye
numpy = lambda x, *args, **kwargs: x.numpy(*args, **kwargs)