Source code for d2l.tensorflow
Open the notebook in Colab
Open the notebook in Colab
Open the notebook in Colab

# This file is generated automatically through:
#    d2lbook build lib
# Don't edit it directly

# Defined in file: ./chapter_preface/index.md
import collections
import hashlib
import math
import os
import random
import re
import shutil
import sys
import tarfile
import time
import zipfile
from collections import defaultdict

import pandas as pd
import requests
from IPython import display
from matplotlib import pyplot as plt

d2l = sys.modules[__name__]


# Defined in file: ./chapter_preface/index.md
import numpy as np
import tensorflow as tf


# Defined in file: ./chapter_preliminaries/calculus.md
[docs]def use_svg_display(): """Use the svg format to display a plot in Jupyter.""" display.set_matplotlib_formats('svg')
# Defined in file: ./chapter_preliminaries/calculus.md
[docs]def set_figsize(figsize=(3.5, 2.5)): """Set the figure size for matplotlib.""" use_svg_display() d2l.plt.rcParams['figure.figsize'] = figsize
# Defined in file: ./chapter_preliminaries/calculus.md
[docs]def set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend): """Set the axes for matplotlib.""" axes.set_xlabel(xlabel) axes.set_ylabel(ylabel) axes.set_xscale(xscale) axes.set_yscale(yscale) axes.set_xlim(xlim) axes.set_ylim(ylim) if legend: axes.legend(legend) axes.grid()
# Defined in file: ./chapter_preliminaries/calculus.md
[docs]def plot(X, Y=None, xlabel=None, ylabel=None, legend=None, xlim=None, ylim=None, xscale='linear', yscale='linear', fmts=('-', 'm--', 'g-.', 'r:'), figsize=(3.5, 2.5), axes=None): """Plot data points.""" if legend is None: legend = [] set_figsize(figsize) axes = axes if axes else d2l.plt.gca() # Return True if `X` (tensor or list) has 1 axis def has_one_axis(X): return (hasattr(X, "ndim") and X.ndim == 1 or isinstance(X, list) and not hasattr(X[0], "__len__")) if has_one_axis(X): X = [X] if Y is None: X, Y = [[]] * len(X), X elif has_one_axis(Y): Y = [Y] if len(X) != len(Y): X = X * len(Y) axes.cla() for x, y, fmt in zip(X, Y, fmts): if len(x): axes.plot(x, y, fmt) else: axes.plot(y, fmt) set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
# Defined in file: ./chapter_linear-networks/linear-regression.md
[docs]class Timer: """Record multiple running times.""" def __init__(self): self.times = [] self.start()
[docs] def start(self): """Start the timer.""" self.tik = time.time()
[docs] def stop(self): """Stop the timer and record the time in a list.""" self.times.append(time.time() - self.tik) return self.times[-1]
[docs] def avg(self): """Return the average time.""" return sum(self.times) / len(self.times)
[docs] def sum(self): """Return the sum of time.""" return sum(self.times)
[docs] def cumsum(self): """Return the accumulated time.""" return np.array(self.times).cumsum().tolist()
# Defined in file: ./chapter_linear-networks/linear-regression-scratch.md
[docs]def synthetic_data(w, b, num_examples): """Generate y = Xw + b + noise.""" X = d2l.zeros((num_examples, w.shape[0])) X += tf.random.normal(shape=X.shape) y = d2l.matmul(X, tf.reshape(w, (-1, 1))) + b y += tf.random.normal(shape=y.shape, stddev=0.01) y = d2l.reshape(y, (-1, 1)) return X, y
# Defined in file: ./chapter_linear-networks/linear-regression-scratch.md
[docs]def linreg(X, w, b): """The linear regression model.""" return d2l.matmul(X, w) + b
# Defined in file: ./chapter_linear-networks/linear-regression-scratch.md
[docs]def squared_loss(y_hat, y): """Squared loss.""" return (y_hat - d2l.reshape(y, y_hat.shape))**2 / 2
# Defined in file: ./chapter_linear-networks/linear-regression-scratch.md
[docs]def sgd(params, grads, lr, batch_size): """Minibatch stochastic gradient descent.""" for param, grad in zip(params, grads): param.assign_sub(lr * grad / batch_size)
# Defined in file: ./chapter_linear-networks/linear-regression-concise.md
[docs]def load_array(data_arrays, batch_size, is_train=True): """Construct a TensorFlow data iterator.""" dataset = tf.data.Dataset.from_tensor_slices(data_arrays) if is_train: dataset = dataset.shuffle(buffer_size=1000) dataset = dataset.batch(batch_size) return dataset
# Defined in file: ./chapter_linear-networks/image-classification-dataset.md
[docs]def get_fashion_mnist_labels(labels): """Return text labels for the Fashion-MNIST dataset.""" text_labels = [ 't-shirt', 'trouser', 'pullover', 'dress', 'coat', 'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot'] return [text_labels[int(i)] for i in labels]
# Defined in file: ./chapter_linear-networks/image-classification-dataset.md
[docs]def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): """Plot a list of images.""" figsize = (num_cols * scale, num_rows * scale) _, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize) axes = axes.flatten() for i, (ax, img) in enumerate(zip(axes, imgs)): ax.imshow(d2l.numpy(img)) ax.axes.get_xaxis().set_visible(False) ax.axes.get_yaxis().set_visible(False) if titles: ax.set_title(titles[i]) return axes
# Defined in file: ./chapter_linear-networks/image-classification-dataset.md
[docs]def load_data_fashion_mnist(batch_size, resize=None): """Download the Fashion-MNIST dataset and then load it into memory.""" mnist_train, mnist_test = tf.keras.datasets.fashion_mnist.load_data() # Divide all numbers by 255 so that all pixel values are between # 0 and 1, add a batch dimension at the last. And cast label to int32 process = lambda X, y: (tf.expand_dims(X, axis=3) / 255, tf.cast(y, dtype='int32')) resize_fn = lambda X, y: (tf.image.resize_with_pad(X, resize, resize) if resize else X, y) return (tf.data.Dataset.from_tensor_slices( process(*mnist_train)).batch(batch_size).shuffle(len( mnist_train[0])).map(resize_fn), tf.data.Dataset.from_tensor_slices( process(*mnist_test)).batch(batch_size).map(resize_fn))
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def accuracy(y_hat, y): """Compute the number of correct predictions.""" if len(y_hat.shape) > 1 and y_hat.shape[1] > 1: y_hat = d2l.argmax(y_hat, axis=1) cmp = d2l.astype(y_hat, y.dtype) == y return float(d2l.reduce_sum(d2l.astype(cmp, y.dtype)))
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def evaluate_accuracy(net, data_iter): """Compute the accuracy for a model on a dataset.""" metric = Accumulator(2) # No. of correct predictions, no. of predictions for X, y in data_iter: metric.add(accuracy(net(X), y), d2l.size(y)) return metric[0] / metric[1]
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]class Accumulator: """For accumulating sums over `n` variables.""" def __init__(self, n): self.data = [0.0] * n
[docs] def add(self, *args): self.data = [a + float(b) for a, b in zip(self.data, args)]
[docs] def reset(self): self.data = [0.0] * len(self.data)
def __getitem__(self, idx): return self.data[idx]
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def train_epoch_ch3(net, train_iter, loss, updater): """The training loop defined in Chapter 3.""" # Sum of training loss, sum of training accuracy, no. of examples metric = Accumulator(3) for X, y in train_iter: # Compute gradients and update parameters with tf.GradientTape() as tape: y_hat = net(X) # Keras implementations for loss takes (labels, predictions) # instead of (predictions, labels) that users might implement # in this book, e.g. `cross_entropy` that we implemented above if isinstance(loss, tf.keras.losses.Loss): l = loss(y, y_hat) else: l = loss(y_hat, y) if isinstance(updater, tf.keras.optimizers.Optimizer): params = net.trainable_variables grads = tape.gradient(l, params) updater.apply_gradients(zip(grads, params)) else: updater(X.shape[0], tape.gradient(l, updater.params)) # Keras loss by default returns the average loss in a batch l_sum = l * float(tf.size(y)) if isinstance( loss, tf.keras.losses.Loss) else tf.reduce_sum(l) metric.add(l_sum, accuracy(y_hat, y), tf.size(y)) # Return training loss and training accuracy return metric[0] / metric[2], metric[1] / metric[2]
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]class Animator: """For plotting data in animation.""" def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None, ylim=None, xscale='linear', yscale='linear', fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1, figsize=(3.5, 2.5)): # Incrementally plot multiple lines if legend is None: legend = [] d2l.use_svg_display() self.fig, self.axes = d2l.plt.subplots(nrows, ncols, figsize=figsize) if nrows * ncols == 1: self.axes = [self.axes,] # Use a lambda function to capture arguments self.config_axes = lambda: d2l.set_axes(self.axes[ 0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend) self.X, self.Y, self.fmts = None, None, fmts
[docs] def add(self, x, y): # Add multiple data points into the figure if not hasattr(y, "__len__"): y = [y] n = len(y) if not hasattr(x, "__len__"): x = [x] * n if not self.X: self.X = [[] for _ in range(n)] if not self.Y: self.Y = [[] for _ in range(n)] for i, (a, b) in enumerate(zip(x, y)): if a is not None and b is not None: self.X[i].append(a) self.Y[i].append(b) self.axes[0].cla() for x, y, fmt in zip(self.X, self.Y, self.fmts): self.axes[0].plot(x, y, fmt) self.config_axes() display.display(self.fig) display.clear_output(wait=True)
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def train_ch3(net, train_iter, test_iter, loss, num_epochs, updater): """Train a model (defined in Chapter 3).""" animator = Animator(xlabel='epoch', xlim=[1, num_epochs], ylim=[0.3, 0.9], legend=['train loss', 'train acc', 'test acc']) for epoch in range(num_epochs): train_metrics = train_epoch_ch3(net, train_iter, loss, updater) test_acc = evaluate_accuracy(net, test_iter) animator.add(epoch + 1, train_metrics + (test_acc,)) train_loss, train_acc = train_metrics assert train_loss < 0.5, train_loss assert train_acc <= 1 and train_acc > 0.7, train_acc assert test_acc <= 1 and test_acc > 0.7, test_acc
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]class Updater(): """For updating parameters using minibatch stochastic gradient descent.""" def __init__(self, params, lr): self.params = params self.lr = lr def __call__(self, batch_size, grads): d2l.sgd(self.params, grads, self.lr, batch_size)
# Defined in file: ./chapter_linear-networks/softmax-regression-scratch.md
[docs]def predict_ch3(net, test_iter, n=6): """Predict labels (defined in Chapter 3).""" for X, y in test_iter: break trues = d2l.get_fashion_mnist_labels(y) preds = d2l.get_fashion_mnist_labels(d2l.argmax(net(X), axis=1)) titles = [true + '\n' + pred for true, pred in zip(trues, preds)] d2l.show_images(d2l.reshape(X[0:n], (n, 28, 28)), 1, n, titles=titles[0:n])
# Defined in file: ./chapter_multilayer-perceptrons/underfit-overfit.md
[docs]def evaluate_loss(net, data_iter, loss): """Evaluate the loss of a model on the given dataset.""" metric = d2l.Accumulator(2) # Sum of losses, no. of examples for X, y in data_iter: l = loss(net(X), y) metric.add(d2l.reduce_sum(l), d2l.size(l)) return metric[0] / metric[1]
# Defined in file: ./chapter_multilayer-perceptrons/kaggle-house-price.md DATA_HUB = dict() DATA_URL = 'http://d2l-data.s3-accelerate.amazonaws.com/' # Defined in file: ./chapter_multilayer-perceptrons/kaggle-house-price.md
[docs]def download(name, cache_dir=os.path.join('..', 'data')): """Download a file inserted into DATA_HUB, return the local filename.""" assert name in DATA_HUB, f"{name} does not exist in {DATA_HUB}." url, sha1_hash = DATA_HUB[name] os.makedirs(cache_dir, exist_ok=True) fname = os.path.join(cache_dir, url.split('/')[-1]) if os.path.exists(fname): sha1 = hashlib.sha1() with open(fname, 'rb') as f: while True: data = f.read(1048576) if not data: break sha1.update(data) if sha1.hexdigest() == sha1_hash: return fname # Hit cache print(f'Downloading {fname} from {url}...') r = requests.get(url, stream=True, verify=True) with open(fname, 'wb') as f: f.write(r.content) return fname
# Defined in file: ./chapter_multilayer-perceptrons/kaggle-house-price.md
[docs]def download_extract(name, folder=None): """Download and extract a zip/tar file.""" fname = download(name) base_dir = os.path.dirname(fname) data_dir, ext = os.path.splitext(fname) if ext == '.zip': fp = zipfile.ZipFile(fname, 'r') elif ext in ('.tar', '.gz'): fp = tarfile.open(fname, 'r') else: assert False, 'Only zip/tar files can be extracted.' fp.extractall(base_dir) return os.path.join(base_dir, folder) if folder else data_dir
[docs]def download_all(): """Download all files in the DATA_HUB.""" for name in DATA_HUB: download(name)
# Defined in file: ./chapter_multilayer-perceptrons/kaggle-house-price.md DATA_HUB['kaggle_house_train'] = (DATA_URL + 'kaggle_house_pred_train.csv', '585e9cc93e70b39160e7921475f9bcd7d31219ce') DATA_HUB['kaggle_house_test'] = (DATA_URL + 'kaggle_house_pred_test.csv', 'fa19780a7b011d9b009e8bff8e99922a8ee2eb90') # Defined in file: ./chapter_deep-learning-computation/use-gpu.md
[docs]def try_gpu(i=0): """Return gpu(i) if exists, otherwise return cpu().""" if len(tf.config.experimental.list_physical_devices('GPU')) >= i + 1: return tf.device(f'/GPU:{i}') return tf.device('/CPU:0')
[docs]def try_all_gpus(): """Return all available GPUs, or [cpu(),] if no GPU exists.""" num_gpus = len(tf.config.experimental.list_physical_devices('GPU')) devices = [tf.device(f'/GPU:{i}') for i in range(num_gpus)] return devices if devices else [tf.device('/CPU:0')]
# Defined in file: ./chapter_convolutional-neural-networks/conv-layer.md
[docs]def corr2d(X, K): """Compute 2D cross-correlation.""" h, w = K.shape Y = tf.Variable(tf.zeros((X.shape[0] - h + 1, X.shape[1] - w + 1))) for i in range(Y.shape[0]): for j in range(Y.shape[1]): Y[i, j].assign(tf.reduce_sum(X[i:i + h, j:j + w] * K)) return Y
# Defined in file: ./chapter_convolutional-neural-networks/lenet.md
[docs]class TrainCallback(tf.keras.callbacks.Callback): """A callback to visiualize the training progress.""" def __init__(self, net, train_iter, test_iter, num_epochs, device_name): self.timer = d2l.Timer() self.animator = d2l.Animator( xlabel='epoch', xlim=[1, num_epochs], legend=['train loss', 'train acc', 'test acc']) self.net = net self.train_iter = train_iter self.test_iter = test_iter self.num_epochs = num_epochs self.device_name = device_name
[docs] def on_epoch_begin(self, epoch, logs=None): self.timer.start()
[docs] def on_epoch_end(self, epoch, logs): self.timer.stop() test_acc = self.net.evaluate(self.test_iter, verbose=0, return_dict=True)['accuracy'] metrics = (logs['loss'], logs['accuracy'], test_acc) self.animator.add(epoch + 1, metrics) if epoch == self.num_epochs - 1: batch_size = next(iter(self.train_iter))[0].shape[0] num_examples = batch_size * tf.data.experimental.cardinality( self.train_iter).numpy() print(f'loss {metrics[0]:.3f}, train acc {metrics[1]:.3f}, ' f'test acc {metrics[2]:.3f}') print(f'{num_examples / self.timer.avg():.1f} examples/sec on ' f'{str(self.device_name)}')
[docs]def train_ch6(net_fn, train_iter, test_iter, num_epochs, lr, device): """Train a model with a GPU (defined in Chapter 6).""" device_name = device._device_name strategy = tf.distribute.OneDeviceStrategy(device_name) with strategy.scope(): optimizer = tf.keras.optimizers.SGD(learning_rate=lr) loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) net = net_fn() net.compile(optimizer=optimizer, loss=loss, metrics=['accuracy']) callback = TrainCallback(net, train_iter, test_iter, num_epochs, device_name) net.fit(train_iter, epochs=num_epochs, verbose=0, callbacks=[callback]) return net
# Defined in file: ./chapter_convolutional-modern/resnet.md
[docs]class Residual(tf.keras.Model): """The Residual block of ResNet.""" def __init__(self, num_channels, use_1x1conv=False, strides=1): super().__init__() self.conv1 = tf.keras.layers.Conv2D(num_channels, padding='same', kernel_size=3, strides=strides) self.conv2 = tf.keras.layers.Conv2D(num_channels, kernel_size=3, padding='same') self.conv3 = None if use_1x1conv: self.conv3 = tf.keras.layers.Conv2D(num_channels, kernel_size=1, strides=strides) self.bn1 = tf.keras.layers.BatchNormalization() self.bn2 = tf.keras.layers.BatchNormalization()
[docs] def call(self, X): Y = tf.keras.activations.relu(self.bn1(self.conv1(X))) Y = self.bn2(self.conv2(Y)) if self.conv3 is not None: X = self.conv3(X) Y += X return tf.keras.activations.relu(Y)
# Defined in file: ./chapter_recurrent-neural-networks/text-preprocessing.md d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt', '090b5e7e70c295757f55df93cb0a180b9691891a')
[docs]def read_time_machine(): """Load the time machine dataset into a list of text lines.""" with open(d2l.download('time_machine'), 'r') as f: lines = f.readlines() return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
# Defined in file: ./chapter_recurrent-neural-networks/text-preprocessing.md
[docs]def tokenize(lines, token='word'): """Split text lines into word or character tokens.""" if token == 'word': return [line.split() for line in lines] elif token == 'char': return [list(line) for line in lines] else: print('ERROR: unknown token type: ' + token)
# Defined in file: ./chapter_recurrent-neural-networks/text-preprocessing.md
[docs]class Vocab: """Vocabulary for text.""" def __init__(self, tokens=None, min_freq=0, reserved_tokens=None): if tokens is None: tokens = [] if reserved_tokens is None: reserved_tokens = [] # Sort according to frequencies counter = count_corpus(tokens) self.token_freqs = sorted(counter.items(), key=lambda x: x[1], reverse=True) # The index for the unknown token is 0 self.unk, uniq_tokens = 0, ['<unk>'] + reserved_tokens uniq_tokens += [ token for token, freq in self.token_freqs if freq >= min_freq and token not in uniq_tokens] self.idx_to_token, self.token_to_idx = [], dict() for token in uniq_tokens: self.idx_to_token.append(token) self.token_to_idx[token] = len(self.idx_to_token) - 1 def __len__(self): return len(self.idx_to_token) def __getitem__(self, tokens): if not isinstance(tokens, (list, tuple)): return self.token_to_idx.get(tokens, self.unk) return [self.__getitem__(token) for token in tokens]
[docs] def to_tokens(self, indices): if not isinstance(indices, (list, tuple)): return self.idx_to_token[indices] return [self.idx_to_token[index] for index in indices]
[docs]def count_corpus(tokens): """Count token frequencies.""" # Here `tokens` is a 1D list or 2D list if len(tokens) == 0 or isinstance(tokens[0], list): # Flatten a list of token lists into a list of tokens tokens = [token for line in tokens for token in line] return collections.Counter(tokens)
# Defined in file: ./chapter_recurrent-neural-networks/text-preprocessing.md
[docs]def load_corpus_time_machine(max_tokens=-1): """Return token indices and the vocabulary of the time machine dataset.""" lines = read_time_machine() tokens = tokenize(lines, 'char') vocab = Vocab(tokens) # Since each text line in the time machine dataset is not necessarily a # sentence or a paragraph, flatten all the text lines into a single list corpus = [vocab[token] for line in tokens for token in line] if max_tokens > 0: corpus = corpus[:max_tokens] return corpus, vocab
# Defined in file: ./chapter_recurrent-neural-networks/language-models-and-dataset.md
[docs]def seq_data_iter_random(corpus, batch_size, num_steps): """Generate a minibatch of subsequences using random sampling.""" # Start with a random offset (inclusive of `num_steps - 1`) to partition a # sequence corpus = corpus[random.randint(0, num_steps - 1):] # Subtract 1 since we need to account for labels num_subseqs = (len(corpus) - 1) // num_steps # The starting indices for subsequences of length `num_steps` initial_indices = list(range(0, num_subseqs * num_steps, num_steps)) # In random sampling, the subsequences from two adjacent random # minibatches during iteration are not necessarily adjacent on the # original sequence random.shuffle(initial_indices) def data(pos): # Return a sequence of length `num_steps` starting from `pos` return corpus[pos:pos + num_steps] num_batches = num_subseqs // batch_size for i in range(0, batch_size * num_batches, batch_size): # Here, `initial_indices` contains randomized starting indices for # subsequences initial_indices_per_batch = initial_indices[i:i + batch_size] X = [data(j) for j in initial_indices_per_batch] Y = [data(j + 1) for j in initial_indices_per_batch] yield d2l.tensor(X), d2l.tensor(Y)
# Defined in file: ./chapter_recurrent-neural-networks/language-models-and-dataset.md
[docs]def seq_data_iter_sequential(corpus, batch_size, num_steps): """Generate a minibatch of subsequences using sequential partitioning.""" # Start with a random offset to partition a sequence offset = random.randint(0, num_steps) num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size Xs = d2l.tensor(corpus[offset:offset + num_tokens]) Ys = d2l.tensor(corpus[offset + 1:offset + 1 + num_tokens]) Xs = d2l.reshape(Xs, (batch_size, -1)) Ys = d2l.reshape(Ys, (batch_size, -1)) num_batches = Xs.shape[1] // num_steps for i in range(0, num_batches * num_steps, num_steps): X = Xs[:, i:i + num_steps] Y = Ys[:, i:i + num_steps] yield X, Y
# Defined in file: ./chapter_recurrent-neural-networks/language-models-and-dataset.md
[docs]class SeqDataLoader: """An iterator to load sequence data.""" def __init__(self, batch_size, num_steps, use_random_iter, max_tokens): if use_random_iter: self.data_iter_fn = d2l.seq_data_iter_random else: self.data_iter_fn = d2l.seq_data_iter_sequential self.corpus, self.vocab = d2l.load_corpus_time_machine(max_tokens) self.batch_size, self.num_steps = batch_size, num_steps def __iter__(self): return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)
# Defined in file: ./chapter_recurrent-neural-networks/language-models-and-dataset.md
[docs]def load_data_time_machine(batch_size, num_steps, use_random_iter=False, max_tokens=10000): """Return the iterator and the vocabulary of the time machine dataset.""" data_iter = SeqDataLoader(batch_size, num_steps, use_random_iter, max_tokens) return data_iter, data_iter.vocab
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]class RNNModelScratch: """A RNN Model implemented from scratch.""" def __init__(self, vocab_size, num_hiddens, init_state, forward_fn): self.vocab_size, self.num_hiddens = vocab_size, num_hiddens self.init_state, self.forward_fn = init_state, forward_fn def __call__(self, X, state, params): X = tf.one_hot(tf.transpose(X), self.vocab_size) X = tf.cast(X, tf.float32) return self.forward_fn(X, state, params)
[docs] def begin_state(self, batch_size): return self.init_state(batch_size, self.num_hiddens)
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]def predict_ch8(prefix, num_preds, net, vocab, params): """Generate new characters following the `prefix`.""" state = net.begin_state(batch_size=1) outputs = [vocab[prefix[0]]] get_input = lambda: d2l.reshape(d2l.tensor([outputs[-1]]), (1, 1)).numpy() for y in prefix[1:]: # Warm-up period _, state = net(get_input(), state, params) outputs.append(vocab[y]) for _ in range(num_preds): # Predict `num_preds` steps y, state = net(get_input(), state, params) outputs.append(int(y.numpy().argmax(axis=1).reshape(1))) return ''.join([vocab.idx_to_token[i] for i in outputs])
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]def grad_clipping(grads, theta): """Clip the gradient.""" theta = tf.constant(theta, dtype=tf.float32) norm = tf.math.sqrt( sum((tf.reduce_sum(grad**2)).numpy() for grad in grads)) norm = tf.cast(norm, tf.float32) new_grad = [] if tf.greater(norm, theta): for grad in grads: new_grad.append(grad * theta / norm) else: for grad in grads: new_grad.append(grad) return new_grad
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]def train_epoch_ch8(net, train_iter, loss, updater, params, use_random_iter): """Train a model within one epoch (defined in Chapter 8).""" state, timer = None, d2l.Timer() metric = d2l.Accumulator(2) # Sum of training loss, no. of tokens for X, Y in train_iter: if state is None or use_random_iter: # Initialize `state` when either it is the first iteration or # using random sampling state = net.begin_state(batch_size=X.shape[0]) with tf.GradientTape(persistent=True) as g: g.watch(params) y_hat, state = net(X, state, params) y = d2l.reshape(tf.transpose(Y), (-1)) l = loss(y, y_hat) grads = g.gradient(l, params) grads = grad_clipping(grads, 1) updater.apply_gradients(zip(grads, params)) # Keras loss by default returns the average loss in a batch # l_sum = l * float(d2l.size(y)) if isinstance( # loss, tf.keras.losses.Loss) else tf.reduce_sum(l) metric.add(l * d2l.size(y), d2l.size(y)) return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()
# Defined in file: ./chapter_recurrent-neural-networks/rnn-scratch.md
[docs]def train_ch8(net, train_iter, vocab, num_hiddens, lr, num_epochs, strategy, use_random_iter=False): """Train a model (defined in Chapter 8).""" with strategy.scope(): params = get_params(len(vocab), num_hiddens) loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) updater = tf.keras.optimizers.SGD(lr) animator = d2l.Animator(xlabel='epoch', ylabel='perplexity', legend=['train'], xlim=[10, num_epochs]) predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, params) # Train and predict for epoch in range(num_epochs): ppl, speed = train_epoch_ch8(net, train_iter, loss, updater, params, use_random_iter) if (epoch + 1) % 10 == 0: print(predict('time traveller')) animator.add(epoch + 1, [ppl]) device = d2l.try_gpu()._device_name print(f'perplexity {ppl:.1f}, {speed:.1f} tokens/sec on {str(device)}') print(predict('time traveller')) print(predict('traveller'))
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip', '94646ad1522d915e7b0f9296181140edcf86a4f5')
[docs]def read_data_nmt(): """Load the English-French dataset.""" data_dir = d2l.download_extract('fra-eng') with open(os.path.join(data_dir, 'fra.txt'), 'r') as f: return f.read()
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def preprocess_nmt(text): """Preprocess the English-French dataset.""" def no_space(char, prev_char): return char in set(',.!?') and prev_char != ' ' # Replace non-breaking space with space, and convert uppercase letters to # lowercase ones text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower() # Insert space between words and punctuation marks out = [ ' ' + char if i > 0 and no_space(char, text[i - 1]) else char for i, char in enumerate(text)] return ''.join(out)
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def tokenize_nmt(text, num_examples=None): """Tokenize the English-French dataset.""" source, target = [], [] for i, line in enumerate(text.split('\n')): if num_examples and i > num_examples: break parts = line.split('\t') if len(parts) == 2: source.append(parts[0].split(' ')) target.append(parts[1].split(' ')) return source, target
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def truncate_pad(line, num_steps, padding_token): """Truncate or pad sequences.""" if len(line) > num_steps: return line[:num_steps] # Truncate return line + [padding_token] * (num_steps - len(line)) # Pad
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def build_array_nmt(lines, vocab, num_steps): """Transform text sequences of machine translation into minibatches.""" lines = [vocab[l] for l in lines] lines = [l + [vocab['<eos>']] for l in lines] array = d2l.tensor([ truncate_pad(l, num_steps, vocab['<pad>']) for l in lines]) valid_len = d2l.reduce_sum(d2l.astype(array != vocab['<pad>'], d2l.int32), 1) return array, valid_len
# Defined in file: ./chapter_recurrent-modern/machine-translation-and-dataset.md
[docs]def load_data_nmt(batch_size, num_steps, num_examples=600): """Return the iterator and the vocabularies of the translation dataset.""" text = preprocess_nmt(read_data_nmt()) source, target = tokenize_nmt(text, num_examples) src_vocab = d2l.Vocab(source, min_freq=2, reserved_tokens=['<pad>', '<bos>', '<eos>']) tgt_vocab = d2l.Vocab(target, min_freq=2, reserved_tokens=['<pad>', '<bos>', '<eos>']) src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps) tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps) data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len) data_iter = d2l.load_array(data_arrays, batch_size) return data_iter, src_vocab, tgt_vocab
# Defined in file: ./chapter_attention-mechanisms/attention-cues.md
[docs]def show_heatmaps(matrices, xlabel, ylabel, titles=None, figsize=(2.5, 2.5), cmap='Reds'): d2l.use_svg_display() num_rows, num_cols = matrices.shape[0], matrices.shape[1] fig, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize, sharex=True, sharey=True, squeeze=False) for i, (row_axes, row_matrices) in enumerate(zip(axes, matrices)): for j, (ax, matrix) in enumerate(zip(row_axes, row_matrices)): pcm = ax.imshow(d2l.numpy(matrix), cmap=cmap) if i == num_rows - 1: ax.set_xlabel(xlabel) if j == 0: ax.set_ylabel(ylabel) if titles: ax.set_title(titles[j]) fig.colorbar(pcm, ax=axes, shrink=0.6)
# Defined in file: ./chapter_optimization/optimization-intro.md
[docs]def annotate(text, xy, xytext): d2l.plt.gca().annotate(text, xy=xy, xytext=xytext, arrowprops=dict(arrowstyle='->'))
# Defined in file: ./chapter_optimization/gd.md
[docs]def train_2d(trainer, steps=20): """Optimize a 2-dim objective function with a customized trainer.""" # s1 and s2 are internal state variables and will # be used later in the chapter x1, x2, s1, s2 = -5, -2, 0, 0 results = [(x1, x2)] for i in range(steps): x1, x2, s1, s2 = trainer(x1, x2, s1, s2) results.append((x1, x2)) return results
[docs]def show_trace_2d(f, results): """Show the trace of 2D variables during optimization.""" d2l.set_figsize() d2l.plt.plot(*zip(*results), '-o', color='#ff7f0e') x1, x2 = d2l.meshgrid(d2l.arange(-5.5, 1.0, 0.1), d2l.arange(-3.0, 1.0, 0.1)) d2l.plt.contour(x1, x2, f(x1, x2), colors='#1f77b4') d2l.plt.xlabel('x1') d2l.plt.ylabel('x2')
# Defined in file: ./chapter_optimization/minibatch-sgd.md d2l.DATA_HUB['airfoil'] = (d2l.DATA_URL + 'airfoil_self_noise.dat', '76e5be1548fd8222e5074cf0faae75edff8cf93f')
[docs]def get_data_ch11(batch_size=10, n=1500): data = np.genfromtxt(d2l.download('airfoil'), dtype=np.float32, delimiter='\t') data = (data - data.mean(axis=0)) / data.std(axis=0) data_iter = d2l.load_array((data[:n, :-1], data[:n, -1]), batch_size, is_train=True) return data_iter, data.shape[1] - 1
# Defined in file: ./chapter_optimization/minibatch-sgd.md
[docs]def train_ch11(trainer_fn, states, hyperparams, data_iter, feature_dim, num_epochs=2): # Initialization w = tf.Variable( tf.random.normal(shape=(feature_dim, 1), mean=0, stddev=0.01), trainable=True) b = tf.Variable(tf.zeros(1), trainable=True) # Train net, loss = lambda X: d2l.linreg(X, w, b), d2l.squared_loss animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[0, num_epochs], ylim=[0.22, 0.35]) n, timer = 0, d2l.Timer() for _ in range(num_epochs): for X, y in data_iter: with tf.GradientTape() as g: l = tf.math.reduce_mean(loss(net(X), y)) dw, db = g.gradient(l, [w, b]) trainer_fn([w, b], [dw, db], states, hyperparams) n += X.shape[0] if n % 200 == 0: timer.stop() p = n / X.shape[0] q = p / tf.data.experimental.cardinality(data_iter).numpy() r = (d2l.evaluate_loss(net, data_iter, loss),) animator.add(q, r) timer.start() print(f'loss: {animator.Y[0][-1]:.3f}, {timer.avg():.3f} sec/epoch') return timer.cumsum(), animator.Y[0]
# Defined in file: ./chapter_optimization/minibatch-sgd.md
[docs]def train_concise_ch11(trainer_fn, hyperparams, data_iter, num_epochs=2): # Initialization net = tf.keras.Sequential() net.add( tf.keras.layers.Dense( 1, kernel_initializer=tf.random_normal_initializer(stddev=0.01))) optimizer = trainer_fn(**hyperparams) loss = tf.keras.losses.MeanSquaredError() # Note: L2 Loss = 1/2 * MSE Loss. TensorFlow has MSE Loss which is # slightly different from MXNet's L2Loss by a factor of 2. Hence we halve # the loss value to get L2Loss in TensorFlow animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[0, num_epochs], ylim=[0.22, 0.35]) n, timer = 0, d2l.Timer() for _ in range(num_epochs): for X, y in data_iter: with tf.GradientTape() as g: out = net(X) l = loss(y, out) / 2 params = net.trainable_variables grads = g.gradient(l, params) optimizer.apply_gradients(zip(grads, params)) n += X.shape[0] if n % 200 == 0: timer.stop() p = n / X.shape[0] q = p / tf.data.experimental.cardinality(data_iter).numpy() r = (d2l.evaluate_loss(net, data_iter, loss) / 2,) animator.add(q, r) timer.start() print(f'loss: {animator.Y[0][-1]:.3f}, {timer.avg():.3f} sec/epoch')
# Defined in file: ./chapter_computer-vision/bounding-box.md
[docs]def box_corner_to_center(boxes): """Convert from (upper_left, bottom_right) to (center, width, height)""" x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] cx = (x1 + x2) / 2 cy = (y1 + y2) / 2 w = x2 - x1 h = y2 - y1 boxes = d2l.stack((cx, cy, w, h), axis=-1) return boxes
[docs]def box_center_to_corner(boxes): """Convert from (center, width, height) to (upper_left, bottom_right)""" cx, cy, w, h = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] x1 = cx - 0.5 * w y1 = cy - 0.5 * h x2 = cx + 0.5 * w y2 = cy + 0.5 * h boxes = d2l.stack((x1, y1, x2, y2), axis=-1) return boxes
# Defined in file: ./chapter_computer-vision/bounding-box.md
[docs]def bbox_to_rect(bbox, color): """Convert bounding box to matplotlib format.""" # Convert the bounding box (top-left x, top-left y, bottom-right x, # bottom-right y) format to matplotlib format: ((upper-left x, # upper-left y), width, height) return d2l.plt.Rectangle(xy=(bbox[0], bbox[1]), width=bbox[2] - bbox[0], height=bbox[3] - bbox[1], fill=False, edgecolor=color, linewidth=2)
# Alias defined in config.ini size = lambda a: tf.size(a).numpy() reshape = tf.reshape ones = tf.ones zeros = tf.zeros meshgrid = tf.meshgrid sin = tf.sin sinh = tf.sinh cos = tf.cos cosh = tf.cosh tanh = tf.tanh linspace = tf.linspace exp = tf.exp normal = tf.random.normal rand = tf.random.uniform matmul = tf.matmul reduce_sum = tf.reduce_sum argmax = tf.argmax tensor = tf.constant arange = tf.range astype = tf.cast int32 = tf.int32 float32 = tf.float32 transpose = tf.transpose concat = tf.concat stack = tf.stack abs = tf.abs eye = tf.eye numpy = lambda x, *args, **kwargs: x.numpy(*args, **kwargs)