1
0

refactor: merge multiple project into one and create new project

This commit is contained in:
2026-04-07 08:30:41 +08:00
parent 7aa7ae3335
commit 6cb1a89751
49 changed files with 2932 additions and 4 deletions

10
dl-exp/.gitignore vendored Normal file
View File

@@ -0,0 +1,10 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv

1
dl-exp/.python-version Normal file
View File

@@ -0,0 +1 @@
3.11

1
dl-exp/.style.yapf Normal file
View File

@@ -0,0 +1 @@
column_limit=120

0
dl-exp/README.md Normal file
View File

0
dl-exp/__init__.py Normal file
View File

0
dl-exp/exp1/__init__.py Normal file
View File

90
dl-exp/exp1/modified.py Normal file
View File

@@ -0,0 +1,90 @@
from enum import IntEnum, auto
from pathlib import Path
import sys
import torch
import matplotlib.pyplot as plt
import torch.nn.functional as F
sys.path.append(str(Path(__file__).resolve().parent.parent))
import gpu_utils
class CurveKind(IntEnum):
"""生成假数据时使用的曲线"""
Polynomials = auto()
Sine = auto()
class DataSource:
"""用于拟合的随机生成的假数据"""
x: torch.Tensor
y: torch.Tensor
def __init__(self, device: torch.device, curve_kind: CurveKind):
match curve_kind:
case CurveKind.Polynomials:
x = torch.linspace(-1, 1, steps=100).reshape(-1, 1)
y = -x.pow(3) + 2 * x.pow(2) + 0.2 * torch.rand(x.size())
case CurveKind.Sine:
# 正弦在0-2之间变化才不是类似线性的
x = torch.linspace(0, 2, steps=100).reshape(-1, 1)
y = x.sin() + 0.2 * torch.rand(x.size())
self.x = x.to(device)
self.y = y.to(device)
class Net(torch.nn.Module):
"""继承torch的module用于表示网络"""
def __init__(self, n_feature, n_hidden, n_output):
super(Net, self).__init__() #继承_init_功能
#定理每层用什么样的形式
self.hidden1 = torch.nn.Linear(n_feature, n_hidden) #隐藏层线性输出
self.hidden2 = torch.nn.Linear(n_hidden, n_hidden) #输出层线性输出
self.hidden3 = torch.nn.Linear(n_hidden, n_hidden) #输出层线性输出
self.predict = torch.nn.Linear(n_hidden, n_output) #输出层线性输出
def forward(self, x): #这同时也是module中的forward功能
#正向传播输入值,神经网络分析出输出值
x = F.relu(self.hidden1(x)) #激励函数(隐藏层的线性值)
x = F.relu(self.hidden2(x))
x = F.relu(self.hidden3(x))
x = self.predict(x) #输出值
return x
def main():
device = gpu_utils.get_gpu_device()
test_data = DataSource(device, CurveKind.Polynomials)
net = Net(n_feature=1, n_hidden=20, n_output=1).to(device)
#optimizer是训练的工具
optimizer = torch.optim.SGD(net.parameters(), lr=0.01) #传入net的所有参数学习率
loss_func = torch.nn.MSELoss() #预测值和真实值的误差计算公式(均方差)
for t in range(2000):
optimizer.zero_grad() #清空上一步的残余更新参数值
prediction: torch.Tensor = net(test_data.x) #喂给net训练数据x输出预测值
loss: torch.Tensor = loss_func(prediction, test_data.y) #计算两者的误差
loss.backward() #误差反向传播,计算参数更新值
optimizer.step() #将参数更新值施加到net的parameters上
#plot and show learning process
plt.cla()
plt.scatter(test_data.x.cpu().data.numpy(), test_data.y.cpu().data.numpy())
plt.scatter(test_data.x.cpu().data.numpy(), prediction.cpu().data.numpy())
plt.text(0.5,
0,
'Loss=%.4f' % loss.cpu().data.numpy(),
fontdict={
'size': 20,
'color': 'red'
})
plt.show()
if __name__ == "__main__":
gpu_utils.print_gpu_availability()
main()

61
dl-exp/exp1/source.py Normal file
View File

@@ -0,0 +1,61 @@
import torch
import matplotlib.pyplot as plt
import torch.nn.functional as F
class Net(torch.nn.Module): #继承 torch 的module
def __init__(self, n_feature, n_hidden, n_output):
super(Net, self).__init__() #继承_init_功能
#定理每层用什么样的形式
self.hidden1 = torch.nn.Linear(n_feature, n_hidden) #隐藏层线性输出
self.hidden2 = torch.nn.Linear(n_hidden, n_hidden) #输出层线性输出
self.hidden3 = torch.nn.Linear(n_hidden, n_hidden) #输出层线性输出
self.predict = torch.nn.Linear(n_hidden, n_output) #输出层线性输出
def forward(self, x): #这同时也是module中的forward功能
#正向传播输入值,神经网络分析出输出值
x = F.relu(self.hidden1(x)) #激励函数(隐藏层的线性值)
x = F.relu(self.hidden2(x))
x = F.relu(self.hidden3(x))
x = self.predict(x) #输出值
return x
def main():
x = torch.unsqueeze(torch.linspace(-1, 1, 100),
dim=1) #x data(tensor),shape=(100,1)
y = -x.pow(3) + 2 * x.pow(2) + 0.2 * torch.rand(x.size())
#y=math.sinx)+o.2*torch.rand(x.size())
net = Net(n_feature=1, n_hidden=20, n_output=1)
#optimizer是训练的工具
optimizer = torch.optim.SGD(net.parameters(), lr=0.01) #传入net的所有参数学习率
loss_func = torch.nn.MSELoss() #预测值和真实值的误差计算公式(均方差)
for t in range(2000):
prediction = net(x) #喂给net训练数据x输出预测值
loss = loss_func(prediction, y) #计算两者的误差
optimizer.zero_grad() #清空上一步的残余更新参数值
loss.backward() #误差反向传播,计算参数更新值
optimizer.step() #将参数更新值施加到net的parameters上
if t % 5 == 0:
#plot and show learning process
plt.cla()
plt.scatter(x.data.numpy(), y.data.numpy())
plt.scatter(x.data.numpy(), prediction.data.numpy())
plt.text(0.5,
0,
'Loss=%.4f' % loss.data.numpy(),
fontdict={
'size': 20,
'color': 'red'
})
plt.show()
if __name__ == "__main__":
main()

2
dl-exp/exp2/datasets/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
# Ignore datasets file
mnist.npz

3
dl-exp/exp2/models/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
# Ignore every saved model files
*.pth
*.ckpt

View File

@@ -0,0 +1,80 @@
from pathlib import Path
import numpy
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import v2 as tvtrans
import settings
class MnistDataset(Dataset):
"""适配PyTorch的自定义Dataset用于加载MNIST数据。"""
shape: int
transform: tvtrans.Transform
images_data: numpy.ndarray
labels_data: torch.Tensor
def __init__(self, images: numpy.ndarray, labels: numpy.ndarray, transform: tvtrans.Transform):
images_len: int = images.shape[0]
labels_len: int = labels.shape[0]
assert (images_len == labels_len)
self.shape = images_len
self.images_data = images
self.labels_data = torch.from_numpy(labels)
self.transform = transform
def __getitem__(self, index):
return self.transform(self.images_data[index]), self.labels_data[index]
def __len__(self):
return self.shape
class MnistDataLoaders:
"""包含适配PyTorch的训练数据Loader和测试数据Loader的类。"""
train_loader: DataLoader
test_loader: DataLoader
def __init__(self, batch_size: int):
dataset = numpy.load(settings.MNIST_DATASET_PATH)
# 所有图片均为黑底白字
# 6万张训练图片60000x28x28。标签只有第一维。
train_images: numpy.ndarray = dataset['x_train']
train_labels: numpy.ndarray = dataset['y_train']
# 1万张测试图片10000x28x28。标签只有第一维。
test_images: numpy.ndarray = dataset['x_test']
test_labels: numpy.ndarray = dataset['y_test']
# 定义数据转换器
trans = tvtrans.Compose([
# 从uint8转换为float32并自动归一化到0-1区间
# YYC MARK: 下面这个被标outdated了换下面两个替代。
# tvtrans.ToTensor(),
tvtrans.ToImage(),
tvtrans.ToDtype(torch.float32, scale=True),
# 为了符合后面图像的输入颜色通道条件,要在最后挤出一个新的维度
# YYC MARK: 上面这两步已经帮我们自动挤出那个灰度通道了。
# tvtrans.Lambda(lambda x: x.unsqueeze(-1))
# 这个特定的标准化参数 (0.1307, 0.3081) 是 MNIST 数据集的标准化参数这些数值是MNIST训练集的全局均值和标准差。
# 这种标准化有助于模型训练时的数值稳定性和收敛速度。
# YYC MARK: 但我不想用,反正最后训练的也收敛。
# tvtrans.Normalize((0.1307,), (0.3081,)),
])
# 创建数据集
train_dataset = MnistDataset(train_images, train_labels,
transform=trans)
test_dataset = MnistDataset(test_images, test_labels,
transform=trans)
# 赋值到自身
self.train_loader = DataLoader(dataset=train_dataset,
batch_size=batch_size,
shuffle=False)
self.test_loader = DataLoader(dataset=test_dataset,
batch_size=batch_size,
shuffle=False)

View File

@@ -0,0 +1,40 @@
import torch
import torch.nn.functional as F
class Cnn(torch.nn.Module):
"""卷积神经网络模型"""
def __init__(self):
super(Cnn, self).__init__()
self.conv1 = torch.nn.Conv2d(1, 32, kernel_size=(3, 3))
self.pool1 = torch.nn.MaxPool2d(kernel_size=(2, 2))
self.conv2 = torch.nn.Conv2d(32, 64, kernel_size=(3, 3))
self.pool2 = torch.nn.MaxPool2d(kernel_size=(2, 2))
self.conv3 = torch.nn.Conv2d(64, 64, kernel_size=(3, 3))
self.flatten = torch.nn.Flatten()
# 28x28过第一轮卷积后变为26x26过第一轮池化后变为13x13
# 过第二轮卷积后变为11x11过第二轮池化后变为5x5
# 过第三轮卷积后变为3x3。
# 最后一轮卷积核个数为64。
self.fc1 = torch.nn.Linear(64 * 3 * 3, 64)
self.fc2 = torch.nn.Linear(64, 10)
def forward(self, x):
x = F.relu(self.conv1(x))
x = self.pool1(x)
x = F.relu(self.conv2(x))
x = self.pool2(x)
x = F.relu(self.conv3(x))
x = self.flatten(x)
x = F.relu(self.fc1(x))
x = self.fc2(x)
# YYC MARK:
# 绝对不要在这里用F.softmax(x, dim=1)输出!
# 由于这些代码是从tensorflow里转换过来的
# tensorflow的loss function是接受possibility作为交叉熵计算的
# 而pytorch要求接受logits即模型softmax之前的参数作为交叉熵计算。
# 所以这里直接输出模型结果。
return x

View File

@@ -0,0 +1,135 @@
from pathlib import Path
import sys
import numpy
import torch
import torch.nn.functional as F
from PIL import Image, ImageFile
import matplotlib.pyplot as plt
from model import Cnn
import settings
sys.path.append(str(Path(__file__).resolve().parent.parent.parent))
import gpu_utils
class PredictResult:
"""预测的结果"""
possibilities: torch.Tensor
"""每个数字不同的概率"""
def __init__(self, possibilities: torch.Tensor):
"""
创建预测结果。
:param possibilities: 传入的tensor表示每个数字不同的概率是经过softmax后的数值。
其shape为二维。dim 0为batch应当只有一维dim 1为每个数字对应的概率。
"""
self.possibilities = possibilities
def chosen_number(self) -> int:
"""
获取最终选定的数字
:return: 以当前概率分布,推测的最终数字。
"""
# 输出出来是10个数字各自的可能性所以要选取最高可能性的那个对比
# 在dim=1上找最大的那个就选那个。dim=0是批次所以不管他。
return self.possibilities.argmax(1).item()
def number_possibilities(self) -> list[float]:
"""
获取每个数字出现的概率
:return: 返回一个具有10个元素的列表列表的每一项表示当前index所代表数字的概率。
"""
return list(self.possibilities[0][i].item() for i in range(10))
class Predictor:
device: torch.device
model: Cnn
def __init__(self):
self.device = gpu_utils.get_gpu_device()
self.model = Cnn().to(self.device)
# 加载保存好的模型参数
self.model.load_state_dict(torch.load(settings.SAVED_MODEL_PATH))
def __predict_tensor(self, in_data: torch.Tensor) -> PredictResult:
"""
其它预测函数都要使用的预测后端。其它预测函数将数据处理成Tensor然后传递给此函数进行实际预测。
:param in_data: 传入的tensor该tensor的shape必须是28x28dtype为float32。
:return: 预测结果。
"""
# 上传tensor到GPU
in_data = in_data.to(self.device)
# 为了满足要求要在第一维度挤出2下
# 一次是灰度通道,一次是批次。
# 相当于batch size = 1的计算
in_data = in_data.unsqueeze(0).unsqueeze(0)
# 开始预测由于模型输出的是没有softmax的数值因此最后还需要softmax一下
with torch.no_grad():
out_data = self.model(in_data)
out_data = F.softmax(out_data, dim=-1)
return PredictResult(out_data)
def predict_sketchpad(self, image: list[list[bool]]) -> PredictResult:
"""
以sketchpad的数据进行预测。
:param image: 该列表的shape必须为28x28。
:return: 预测结果。
"""
input = torch.tensor(image, dtype=torch.float32)
assert(input.dim() == 2)
assert(input.size(0) == 28)
assert(input.size(1) == 28)
return self.__predict_tensor(input)
def predict_image(self, image: ImageFile.ImageFile) -> PredictResult:
"""
以Pillow图像的数据进行预测。
:param image: Pillow图像数据。该图像必须为28x28大小。
:return: 预测结果。
"""
# 确保图像为灰度图像,以及宽高合适
grayscale_image = image.convert('L')
assert(grayscale_image.width == 28)
assert(grayscale_image.height == 28)
# 转换为numpy数组。注意这里的numpy数组是只读的所以要先拷贝一份
numpy_data = numpy.reshape(grayscale_image, (28, 28), copy=True)
# 转换到Tensor设置dtype
data = torch.from_numpy(numpy_data).float()
# 归一化到255又因为图像输入是白底黑字需要做转换。
data.div_(255.0).sub_(1).mul_(-1)
return self.__predict_tensor(data)
def main():
predictor = Predictor()
# 遍历测试目录中的所有图片,并处理。
test_dir = Path(__file__).resolve().parent.parent / 'test_images'
for image_path in test_dir.glob('*.png'):
if image_path.is_file():
print(f'Predicting {image_path} ...')
image = Image.open(image_path)
rv = predictor.predict_image(image)
print(f'Predict digit: {rv.chosen_number()}')
plt.figure(f'Image - {image_path}')
plt.imshow(image)
plt.axis('on')
plt.title(f'Predict digit: {rv.chosen_number()}')
plt.show()
if __name__ == "__main__":
gpu_utils.print_gpu_availability()
main()

View File

@@ -0,0 +1,12 @@
from pathlib import Path
MNIST_DATASET_PATH: Path = Path(__file__).resolve().parent.parent / 'datasets' / 'mnist.npz'
"""MNIST数据集文件的路径"""
SAVED_MODEL_PATH: Path = Path(__file__).resolve().parent.parent / 'models' / 'cnn.pth'
"""训练好的模型保存的位置"""
N_EPOCH: int = 5
"""训练时的epoch次数"""
N_BATCH_SIZE: int = 1000
"""训练时的batch size"""

View File

@@ -0,0 +1,190 @@
from pathlib import Path
import sys
import typing
import tkinter as tk
from predict import PredictResult, Predictor
sys.path.append(str(Path(__file__).resolve().parent.parent.parent))
import gpu_utils
class SketchpadApp:
IMAGE_HW: typing.ClassVar[int] = 28
PIXEL_HW: typing.ClassVar[int] = 15
def __init__(self, root: tk.Tk, predictor: Predictor):
self.root = root
self.root.title("看图说数")
# 创建画板框架
canvas_frame = tk.Frame(root)
canvas_frame.pack(pady=10)
# 创建图像大小的画板
self.canvas_pixel_count = SketchpadApp.IMAGE_HW
self.canvas_pixel_size = SketchpadApp.PIXEL_HW # 每个像素的大小
canvas_hw = self.canvas_pixel_count * self.canvas_pixel_size
self.canvas_width = canvas_hw
self.canvas_height = canvas_hw
self.canvas = tk.Canvas(
canvas_frame,
width=self.canvas_width,
height=self.canvas_height,
bg='black'
)
self.canvas.pack()
# 存储画板状态。False表示没有画黑色True表示画了白色
self.canvas_data = [[False for _ in range(self.canvas_pixel_count)] for _ in range(self.canvas_pixel_count)]
# 绑定鼠标事件
self.canvas.bind("<B1-Motion>", self.paint)
self.canvas.bind("<Button-1>", self.paint)
# 绘制初始网格
self.draw_grid()
# 创建表格框架
table_frame = tk.Frame(root)
table_frame.pack(pady=10)
# 表头数据
header_words = ("猜测的数字", ) + tuple(f'{i}的概率' for i in range(10))
# 创建表头
for col, header in enumerate(header_words):
header_label = tk.Label(
table_frame,
text=header,
relief="solid",
borderwidth=1,
width=12,
height=2,
bg="lightblue"
)
header_label.grid(row=0, column=col, sticky="nsew")
# 创建第二行(显示数值的行)
self.value_labels = []
for col in range(len(header_words)):
value_label = tk.Label(
table_frame,
text="0.00", # 默认显示0.00
relief="solid",
borderwidth=1,
width=12,
height=2,
bg="white"
)
value_label.grid(row=1, column=col, sticky="nsew")
self.value_labels.append(value_label)
# 设置第一列的特殊样式(猜测的数字)
self.value_labels[0].config(text="N/A", bg="lightyellow")
# 清空样式
self.clear_table()
# 创建按钮框架
button_frame = tk.Frame(root)
button_frame.pack(pady=10)
# 执行按钮
execute_button = tk.Button(
button_frame,
text="执行",
command=self.execute,
bg='lightgreen',
width=10
)
execute_button.pack(side=tk.LEFT, padx=5)
# 重置按钮
reset_button = tk.Button(
button_frame,
text="重置",
command=self.reset,
bg='lightcoral',
width=10
)
reset_button.pack(side=tk.LEFT, padx=5)
# 设置用于执行的predictor
self.predictor = predictor
# region: 画板部分
canvas: tk.Canvas
canvas_data: list[list[bool]]
canvas_width: int
canvas_height: int
def draw_grid(self):
"""绘制网格线"""
for i in range(self.canvas_pixel_count + 1):
# 垂直线
self.canvas.create_line(
i * self.canvas_pixel_size, 0,
i * self.canvas_pixel_size, self.canvas_height,
fill='lightgray'
)
# 水平线
self.canvas.create_line(
0, i * self.canvas_pixel_size,
self.canvas_width, i * self.canvas_pixel_size,
fill='lightgray'
)
def paint(self, event):
"""处理鼠标绘制事件"""
# 计算点击的网格坐标
col = event.x // self.canvas_pixel_size
row = event.y // self.canvas_pixel_size
# 确保坐标在有效范围内
if 0 <= col < self.canvas_pixel_count and 0 <= row < self.canvas_pixel_count:
# 更新网格状态
if self.canvas_data[row][col] != True:
self.canvas_data[row][col] = True
# 绘制黑色矩形
x1 = col * self.canvas_pixel_size
y1 = row * self.canvas_pixel_size
x2 = x1 + self.canvas_pixel_size
y2 = y1 + self.canvas_pixel_size
self.canvas.create_rectangle(x1, y1, x2, y2, fill='white', outline='')
# endregion
# region: 表格部分
value_labels: list[tk.Label]
def show_in_table(self, result: PredictResult):
self.value_labels[0].config(text=str(result.chosen_number()))
number_possibilities = result.number_possibilities()
for index, label in enumerate(self.value_labels[1:]):
label.config(text=f'{number_possibilities[index]:.4f}')
def clear_table(self):
for label in self.value_labels:
label.config(text='N/A')
# endregion
# region: 按钮部分
predictor: Predictor
def execute(self):
"""执行按钮功能 - 将画板数据传递给后端"""
prediction = self.predictor.predict_sketchpad(self.canvas_data)
self.show_in_table(prediction)
def reset(self):
"""重置按钮功能 - 清空画板"""
self.canvas.delete("all")
self.canvas_data = [[0 for _ in range(self.canvas_pixel_count)] for _ in range(self.canvas_pixel_count)]
self.draw_grid()
self.clear_table()
# endregion
if __name__ == "__main__":
gpu_utils.print_gpu_availability()
predictor = Predictor()
root = tk.Tk()
app = SketchpadApp(root, predictor)
root.mainloop()

View File

@@ -0,0 +1,85 @@
from pathlib import Path
import sys
import typing
import torch
import torchinfo
import ignite.engine
import ignite.metrics
from ignite.engine import Engine, Events
from ignite.handlers.tqdm_logger import ProgressBar
from dataset import MnistDataLoaders
from model import Cnn
import settings
sys.path.append(str(Path(__file__).resolve().parent.parent.parent))
import gpu_utils
class Trainer:
"""核心训练器"""
device: torch.device
data_source: MnistDataLoaders
model: Cnn
trainer: Engine
evaluator: Engine
pbar: ProgressBar
def __init__(self):
# 创建训练设备,模型和数据加载器。
self.device = gpu_utils.get_gpu_device()
self.model = Cnn().to(self.device)
self.data_source = MnistDataLoaders(batch_size=settings.N_BATCH_SIZE)
# 展示模型结构。批次为指定批次数量通道只有一个灰度通道大小28x28。
torchinfo.summary(self.model, (settings.N_BATCH_SIZE, 1, 28, 28))
# 优化器和损失函数
optimizer = torch.optim.Adam(self.model.parameters(), eps=1e-7)
criterion = torch.nn.CrossEntropyLoss()
# 创建训练器
self.trainer = ignite.engine.create_supervised_trainer(
self.model, optimizer, criterion, self.device)
# 将训练器关联到进度条
self.pbar = ProgressBar(persist=True)
self.pbar.attach(self.trainer, output_transform=lambda loss: {"loss": loss})
# 创建测试的评估器的评估量
evaluator_metrics = {
# 这个Accuracy要的是logits而不是possibilities
# 所以依然是不需要softmax处理后的结果。
"accuracy": ignite.metrics.Accuracy(device=self.device),
"loss": ignite.metrics.Loss(criterion, device=self.device)
}
# 创建测试评估器
self.evaluator = ignite.engine.create_supervised_evaluator(
self.model, metrics=evaluator_metrics, device=self.device)
def train_model(self):
# 训练模型
self.trainer.run(self.data_source.train_loader, max_epochs=settings.N_EPOCH)
def save_model(self):
# 确保保存模型的文件夹存在。
settings.SAVED_MODEL_PATH.parent.mkdir(parents=True, exist_ok=True)
# 仅保存模型参数
torch.save(self.model.state_dict(), settings.SAVED_MODEL_PATH)
print(f'Model was saved into: {settings.SAVED_MODEL_PATH}')
def test_model(self):
# 测试模型并输出结果
self.evaluator.run(self.data_source.test_loader)
metrics = self.evaluator.state.metrics
print(f"Accuracy: {metrics['accuracy']:.4f} Loss: {metrics['loss']:.4f}")
def main():
trainer = Trainer()
trainer.train_model()
trainer.save_model()
trainer.test_model()
if __name__ == "__main__":
gpu_utils.print_gpu_availability()
main()

View File

@@ -0,0 +1,40 @@
from pathlib import Path
import tensorflow as tf
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
from train import CNN
class Predict(object):
def __init__(self):
latest = tf.train.latest_checkpoint('./ckpt')
self.cnn = CNN()
# 恢复网络权重
self.cnn.model.load_weights(latest)
def predict(self, image_path):
# 以黑白方式读取图片
img = Image.open(image_path).convert('L')
img = np.reshape(img, (28, 28, 1)) / 255.
x = np.array([1 - img])
y = self.cnn.model.predict(x)
# 因为x只传入了一张图片取y[0]即可
# np.argmax()取得最大值的下标,即代表的数字
print(image_path)
# print(y[0])
print(' -> Predict digit', np.argmax(y[0]))
plt.figure("Image") # 图像窗口名称
plt.imshow(img)
plt.axis('on') # 关掉坐标轴为 off
plt.title(np.argmax(y[0])) # 图像题目 # 必须有这个,要不然无法显示
plt.show()
if __name__ == "__main__":
app = Predict()
images_dir = Path(__file__).resolve().parent.parent / 'test_images'
app.predict(images_dir / '0.png')
app.predict(images_dir / '1.png')
app.predict(images_dir / '4.png')

View File

@@ -0,0 +1,56 @@
from pathlib import Path
import tensorflow as tf
from tensor.keras import datasets, layers, models
class CNN(object):
def __init__(self):
model = models.Sequential()
# 第1层卷积卷积核大小为3*332个28*28为待训练图片的大小
model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(layers.MaxPooling2D((2, 2)))
# 第2层卷积卷积核大小为3*364个
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
# 第三层卷积卷积核大小为3*364个
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.Flatten())
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(10, activation='softmax'))
model.summary()
self.model = model
class DataSource(object):
def __init__(self):
# mnist数据集存储的位置如何不存在将自动下载
data_path = Path(__file__).resolve().parent.parent / 'datasets' / 'mnist.npz'
(train_images, train_labels), (test_images,
test_labels) = datasets.mnist.load_data(path=data_path)
# 6万张训练图片1万张测试图片
train_images = train_images.reshape((60000, 28, 28, 1))
test_images = test_images.reshape((10000, 28, 28, 1))
# 像素值映射到 0 - 1 之间
train_images, test_images = train_images / 255.0, test_images / 255.0
self.train_images, self.train_labels = train_images, train_labels
self.test_images, self.test_labels = test_images, test_labels
class Train:
def __init__(self):
self.cnn = CNN()
self.data = DataSource()
def train(self):
check_path = Path(__file__).resolve().parent.parent / 'models' / 'cnn.ckpt'
# period 每隔5epoch保存一次
save_model_cb = tf.keras.callbacks.ModelCheckpoint(
str(check_path), save_weights_only=True, verbose=1, period=5)
self.cnn.model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
self.cnn.model.fit(self.data.train_images, self.data.train_labels,
epochs=5, batch_size=1000, callbacks=[save_model_cb])
test_loss, test_acc = self.cnn.model.evaluate(
self.data.test_images, self.data.test_labels)
print("准确率: %.4f, 共测试了%d张图片 " % (test_acc, len(self.data.test_labels)))
if __name__ == "__main__":
app = Train()
app.train()

2
dl-exp/exp2/test_images/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
# Ignore all test images
*.png

3
dl-exp/exp3/datasets/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
# Ignore datasets and processed datasets
*.txt
*.pickle

2
dl-exp/exp3/models/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
# Ignore every saved model files
*.pth

View File

@@ -0,0 +1,271 @@
from pathlib import Path
import typing
import pickle
from collections import Counter
import numpy
import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
import settings
TOKEN_PAD: str = '[PAD]'
"""使用古诗词数据时的特殊字符RNN填充时使用的填充字符。"""
TOKEN_UNK: str = '[UNK]'
"""使用古诗词数据时的特殊字符,词频不足的生僻字。"""
TOKEN_CLS: str = '[CLS]'
"""使用古诗词数据时的特殊字符,标记古诗词开始。"""
TOKEN_SEP: str = '[SEP]'
"""使用古诗词数据时的特殊字符,标记古诗词结束。"""
class Tokenizer:
"""分词器"""
token_dict: dict[str, int]
"""词->编号的映射"""
token_dict_rev: dict[int, str]
"""编号->词的映射"""
vocab_size: int
"""词汇表大小"""
def __init__(self, token_dict: dict[str, int]):
self.token_dict = token_dict
self.token_dict_rev = {value: key for key, value in self.token_dict.items()}
self.vocab_size = len(self.token_dict)
def id_to_token(self, token_id: int) -> str:
"""
给定一个编号,查找词汇表中对应的词。
:param token_id: 带查找词的编号
:return: 编号对应的词
"""
return self.token_dict_rev[token_id]
def token_to_id(self, token: str):
"""
给定一个词,查找它在词汇表中的编号。
未找到则返回低频词[UNK]的编号。
:param token: 带查找编号的词
:return: 词的编号
"""
return self.token_dict.get(token, self.token_dict['[UNK]'])
def encode(self, tokens: str) -> list[int]:
"""
给定一个字符串s在头尾分别加上标记开始和结束的特殊字符并将它转成对应的编号序列
:param tokens: 待编码字符串
:return: 编号序列
"""
# 加上开始标记
token_ids: list[int] = [self.token_to_id(TOKEN_CLS), ]
# 加入字符串编号序列
for token in tokens:
token_ids.append(self.token_to_id(token))
# 加上结束标记
token_ids.append(self.token_to_id(TOKEN_SEP))
return token_ids
def decode(self, token_ids: typing.Iterable[int]) -> str:
"""
给定一个编号序列,将它解码成字符串
:param token_ids: 待解码的编号序列
:return: 解码出的字符串
"""
# 起止标记字符特殊处理
spec_tokens = {TOKEN_CLS, TOKEN_SEP}
# 保存解码出的字符的list
tokens: list[str] = []
for token_id in token_ids:
token = self.id_to_token(token_id)
if token in spec_tokens:
continue
tokens.append(token)
# 拼接字符串
return ''.join(tokens)
class PoetryPreprocessor:
"""
古诗词数据集的预处理器。
该类负责古诗词数据的读取,清洗和数据持久化。
"""
tokenizer: Tokenizer
"""分词器"""
poetry: list[str]
"""古诗词数据集,每一项是一首诗"""
def __init__(self, force_reclean: bool=False):
# 加载古诗词数据集
if force_reclean or (not settings.CLEAN_DATASET_PATH.is_file()):
(self.poetry, self.tokenizer) = self.__load_from_dirty()
else:
(self.poetry, self.tokenizer) = self.__load_from_clean()
def __load_from_clean(self) -> tuple[list[str], Tokenizer]:
"""直接读取清洗后的数据"""
with open(settings.CLEAN_DATASET_PATH, 'rb') as f:
return pickle.load(f)
def __load_from_dirty(self) -> tuple[list[str], Tokenizer]:
"""从原始数据加载,清洗数据后,写入缓存文件,并返回清洗后的数据"""
# 加载脏的古诗数据
with open(settings.DIRTY_DATASET_PATH, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 清洗古诗数据
poetry = self.__wash_dirty_poetry(lines)
# 构建分词器
tokenizer = self.__build_tokenizer(poetry)
# 数据清理完毕
# 写入干净数据
with open(settings.CLEAN_DATASET_PATH, 'wb') as f:
pickle.dump((poetry, tokenizer), f)
# 返回结果
return poetry, tokenizer
def __wash_dirty_poetry(self, poetry: list[str]) -> list[str]:
"""
清洗给定的古诗数据。
:param poetry: 要清洗的古诗数据,每一行是一首古诗。
古诗开头是标题,然后是一个冒号(全角或半角),然后是古诗主体。
:return: 清洗完毕的古诗。
"""
# 禁用词列表,包含如下字符的诗歌将被忽略
BAD_WORDS = ['', '', '(', ')', '__', '', '', '', '', '[', ']']
# 数据集列表
clean_poetry: list[str] = []
# 逐行处理读取到的数据
for line in poetry:
# 删除空白字符
line = line.strip()
# 将全角冒号替换为半角的
line = line.replace('', ':')
# 有且只能有一个冒号用来分割标题
if line.count(':') != 1: continue
# 获取后半部分(删除标题)
_, last_part = line.split(':')
# 长度不能超过最大长度减去2是因为古诗首尾要加特殊符号
if len(last_part) > settings.POETRY_MAX_LEN - 2:
continue
# 不能包含禁止词
for bad_word in BAD_WORDS:
if bad_word in last_part:
break
else:
# 如果循环正常结束就表明没有bad words推入数据列表
clean_poetry.append(last_part)
# 返回清洗完毕的结果
return clean_poetry
def __build_tokenizer(self, poetry: list[str]) -> Tokenizer:
"""
根据给定古诗统计词频,并构建分词器。
:param poetry: 清洗完毕后的古诗,每一行是一句诗。
:return: 构建完毕的分词器。
"""
# 统计词频
counter: Counter[str] = Counter()
for line in poetry:
counter.update(line)
# 过滤掉低频词
tokens = ((token, count) for token, count in counter.items() if count >= settings.POETRY_MIN_WORD_FREQ)
# 按词频排序
tokens = sorted(tokens, key=lambda x: -x[1])
# 去掉词频,只保留词列表
tokens = list(token for token, _ in tokens)
# 将特殊词和数据集中的词拼接起来
tokens = ['[PAD]', '[UNK]', '[CLS]', '[SEP]'] + tokens
# 创建词典 token->id映射关系
token_id_dict = dict(zip(tokens, range(len(tokens))))
# 使用新词典重新建立分词器
tokenizer = Tokenizer(token_id_dict)
# 直接返回,此处无需混洗数据
return tokenizer
class PoetryDataset(Dataset):
"""适配PyTorch的古诗词Dataset"""
preprocessor: PoetryPreprocessor
def __init__(self, poetry: PoetryPreprocessor):
self.preprocessor = poetry
def __getitem__(self, index):
# 获取古诗词并编码
poetry = self.preprocessor.poetry[index]
encoded_poetry = self.preprocessor.tokenizer.encode(poetry)
# 直接返回编码后的古诗词数据数据的padding和输入输出构成由DataLoader来做。
return encoded_poetry
def __len__(self):
return len(self.preprocessor.poetry)
class PoetryDataLoader:
"""适配PyTorch的古诗词数据Loader"""
preprocessor: PoetryPreprocessor
dataset: PoetryDataset
loader: DataLoader
def __init__(self, batch_size: int, force_reclean: bool=False):
self.preprocessor = PoetryPreprocessor(force_reclean)
self.dataset = PoetryDataset(self.preprocessor)
self.loader = DataLoader(dataset=self.dataset,
batch_size=batch_size,
# 对古诗词做padding后返回
collate_fn=lambda batch: self.__collect_fn(batch),
# 混洗数据以防止过拟合
shuffle=True)
def get_vocab_size(self) -> int:
"""一个便捷的获取vocab_size的函数避免层层调用"""
return self.preprocessor.tokenizer.vocab_size
def get_tokenizer(self) -> Tokenizer:
"""一个便捷的获取Tokenizer的函数避免层层调用"""
return self.preprocessor.tokenizer
def __collect_fn(self, batch: list[list[int]]) -> tuple[torch.Tensor, torch.Tensor]:
"""
适用于DataLoader的样本收集器。
用于将上传的古诗词样本做padding后打包返回。
"""
# 计算填充长度
length = max(map(len, batch))
# 获取填充数据
padding = self.preprocessor.tokenizer.token_to_id(TOKEN_PAD)
# 开始填充
padded_batch: list[list[int]] = []
for entry in batch:
padding_length = length - len(entry)
if padding_length > 0:
# 不足就进行填充
padded_batch.append(numpy.concatenate([entry, [padding] * padding_length]))
else:
# 超过就进行截断
padded_batch.append(entry[:length])
numpy_batch = numpy.array(padded_batch)
# 生成输入和输出。
# 输入是去除最后一个字符的部分,输出是去除第一个字符的部分。
# 这么做是为了让RNN从输入推到输出下一个字符
# 此外输出要做onehot编码
input = torch.tensor(numpy_batch[:, :-1], dtype=torch.long)
output = torch.tensor(numpy_batch[:, 1:], dtype=torch.long)
# 返回结果
return input, output

View File

@@ -0,0 +1,41 @@
import torch
import torch.nn.functional as F
class TimeDistributed(torch.nn.Module):
"""模拟tensorflow中的TimeDistributed包装层因为pytorch似乎不提供这个。"""
layer: torch.nn.Module
"""内部节点"""
def __init__(self, layer: torch.nn.Module):
super(TimeDistributed, self).__init__()
self.layer = layer
def forward(self, x: torch.Tensor):
# 获取批次大小,时间步个数,特征个数
batch_size, time_steps, features = x.size()
# 把时间步维度合并到批次维度中然后运算,这样在其他层看来这就是不同的批次而已。
x = x.reshape(-1, features)
outputs: torch.Tensor = self.layer(x)
# 再把时间步维度还原出来
outputs = outputs.reshape(batch_size, time_steps, -1)
return outputs
class Rnn(torch.nn.Module):
"""循环神经网络"""
def __init__(self, vocab_size: int):
super(Rnn, self).__init__()
self.embedding = torch.nn.Embedding(vocab_size, 128)
self.lstm1 = torch.nn.LSTM(128, 128, batch_first=True, dropout=0.5)
self.lstm2 = torch.nn.LSTM(128, 128, batch_first=True, dropout=0.5)
self.timedfc = TimeDistributed(torch.nn.Linear(128, vocab_size))
def forward(self, x):
x = self.embedding(x)
x, _ = self.lstm1(x)
x, _ = self.lstm2(x)
x = self.timedfc(x)
return x

View File

@@ -0,0 +1,147 @@
from pathlib import Path
import sys
import numpy
import torch
import torch.nn.functional as F
import settings
from dataset import Tokenizer, PoetryDataLoader
from model import Rnn
sys.path.append(str(Path(__file__).resolve().parent.parent.parent))
import gpu_utils
def generate_random_poetry(tokenizer: Tokenizer, model: Rnn, device: torch.device, s: str='') -> str:
"""
随机生成一首诗
:param tokenizer: 分词器
:param model: 用于生成古诗的模型
:param s: 用于生成古诗的起始字符串,默认为空串
:return: 一个字符串,表示一首古诗
"""
# 将初始字符串转成token
token_ids = tokenizer.encode(s)
# 去掉结束标记[SEP]
token_ids = token_ids[:-1]
while len(token_ids) < settings.POETRY_MAX_LEN:
# 进行预测其中batch_size=1
input = torch.tensor(token_ids, dtype=torch.long).unsqueeze(0)
output: torch.Tensor = model(input.to(device))
# 计算最后一个字符的概率分布。
# 由于后续预测概率时,需要批次维度,所以方括号里第一项写:保留批次维度。
# 然后因为只有最后一个字符是预测的,其他字符都是辅助推断的,所以方括号第二项-1表示取这个最后一个字符。
# 最后,它的概率分布中不包含[PAD][UNK][CLS]的概率分布所以方括号第三项3:把这些东西删掉这些编号是Tokenizer在编译时写死的详细查看对应模块
possibilities = F.softmax(output[:, -1, 3:], dim=-1)
# 按照预测出的概率,随机选择一个词作为预测结果。
# 如果需要贪心则用argmax替代。
target_index = torch.multinomial(possibilities, num_samples=1)
# 记得把之前删除的维度加回来才是token id
target_id = target_index.item() + 3
# 把target_id加入序列
token_ids.append(target_id)
# 如果target_id是[SEP],表示输出结束,需要退出
if target_id == 3: break
# 解码并返回结果
return tokenizer.decode(token_ids)
def generate_acrostic(tokenizer: Tokenizer, model: Rnn, device: torch.device, head: str) -> str:
"""
随机生成一首藏头诗
:param tokenizer: 分词器
:param model: 用于生成古诗的模型
:param head: 藏头诗的头
:return: 一个字符串,表示一首古诗
"""
# 使用空串初始化token_ids
token_ids = tokenizer.encode('')
# 去掉结束标记[SEP],只保留[CLS]
token_ids = token_ids[:-1]
# 标点符号,这里简单的只把逗号和句号作为标点
punctuations = ['', '']
punctuation_ids = {tokenizer.token_to_id(token) for token in punctuations}
# 缓存生成的诗的list
poetry: list[str] = []
# 对于藏头诗中的每一个字,都生成一个短句
for ch in head:
# 先记录下这个字
poetry.append(ch)
# 将藏头诗的字符转成token id
token_id = tokenizer.token_to_id(ch)
# 加入到列表中去
token_ids.append(token_id)
# 开始生成一个短句
while True:
# 与generate_random_poetry函数相同的方式不断地生成诗句的下一个字。
input = torch.tensor(token_ids, dtype=torch.long).unsqueeze(0)
output: torch.Tensor = model(input.to(device))
possibilities = F.softmax(output[:, -1, 3:], dim=-1)
target_index = torch.multinomial(possibilities, num_samples=1)
target_id = target_index.item() + 3
# 把target_id加入序列
token_ids.append(target_id)
# 只有对应ID不是特殊符号的ID我们才把这个字符推入诗句中
if target_id > 3: poetry.append(tokenizer.id_to_token(target_id))
# 此外,与上面不同的是,当输出为标点符号时,我们退出当前循环,进而生成藏头诗的下一句。
if target_id in punctuation_ids: break
# 解码并返回结果
return ''.join(poetry)
class Predictor:
device: torch.device
data_loader: PoetryDataLoader
model: Rnn
def __init__(self):
self.device = gpu_utils.get_gpu_device()
self.data_loader = PoetryDataLoader(batch_size=settings.N_BATCH_SIZE)
self.model = Rnn(self.data_loader.get_vocab_size()).to(self.device)
# 加载保存好的模型参数
self.model.load_state_dict(torch.load(settings.SAVED_MODEL_PATH))
self.model.eval()
def generate_random_poetry(self, s: str = ''):
"""随机生成一首诗"""
with torch.no_grad():
print(generate_random_poetry(self.data_loader.get_tokenizer(),
self.model,
self.device,
s))
def generate_acrostic(self, s: str):
"""随机生成一首藏头诗"""
with torch.no_grad():
print(generate_acrostic(self.data_loader.get_tokenizer(),
self.model,
self.device,
s))
def main():
predictor = Predictor()
# 随机生成一首诗
predictor.generate_random_poetry()
# 给出部分信息的情况下,随机生成剩余部分
predictor.generate_random_poetry('床前明月光,')
# 生成藏头诗
predictor.generate_acrostic('好好学习天天向上')
if __name__ == "__main__":
gpu_utils.print_gpu_availability()
main()

View File

@@ -0,0 +1,19 @@
from pathlib import Path
POETRY_MAX_LEN: int = 64
"""古诗词句子最大允许长度(该长度包含首尾填充的特殊字符),超过该长度的诗句将被删除。"""
POETRY_MIN_WORD_FREQ: int = 8
"""古诗词最小允许词频,小于该词频的词将在编解码时被视为[UNK]生僻字。"""
DIRTY_DATASET_PATH: Path = Path(__file__).resolve().parent.parent / 'datasets' / 'poetry.txt'
"""脏的(未清洗的)古诗数据的路径"""
CLEAN_DATASET_PATH: Path = Path(__file__).resolve().parent.parent / 'datasets' / 'poetry.pickle'
"""干净的(已经清洗过的)古诗数据的路径"""
SAVED_MODEL_PATH: Path = Path(__file__).resolve().parent.parent / 'models' / 'rnn.pth'
"""训练完毕的模型进行保存的路径"""
N_EPOCH: int = 10
"""训练时的epoch"""
N_BATCH_SIZE: int = 50
"""训练时的batch size"""

View File

@@ -0,0 +1,79 @@
from pathlib import Path
import sys
import typing
import torch
import torchinfo
import ignite.engine
import ignite.metrics
from ignite.engine import Engine, Events
from ignite.handlers.tqdm_logger import ProgressBar
from dataset import PoetryDataLoader
from model import Rnn
from predict import generate_random_poetry
import settings
sys.path.append(str(Path(__file__).resolve().parent.parent.parent))
import gpu_utils
class Trainer:
"""核心训练器"""
device: torch.device
data_loader: PoetryDataLoader
model: Rnn
trainer: Engine
pbar: ProgressBar
def __init__(self):
# 创建训练设备,模型和数据加载器。
self.device = gpu_utils.get_gpu_device()
self.data_loader = PoetryDataLoader(batch_size=settings.N_BATCH_SIZE)
self.model = Rnn(self.data_loader.get_vocab_size()).to(self.device)
# 展示模型结构。批次为指定批次数量最大诗歌长度同时输入一定是int32
torchinfo.summary(self.model,
(settings.N_BATCH_SIZE, settings.POETRY_MAX_LEN),
dtypes=[torch.int32,])
# 优化器和损失函数
optimizer = torch.optim.Adam(self.model.parameters(), eps=1e-7)
criterion = torch.nn.CrossEntropyLoss()
# 创建训练器
self.trainer = ignite.engine.create_supervised_trainer(
self.model, optimizer, criterion, self.device,
# 由于PyTorch的交叉熵函数总是要求概率在dim=1所以要调换一下维度才能传入。
model_transform=lambda output: self.__adjust_for_loss(output))
# 将训练器关联到进度条
self.pbar = ProgressBar(persist=True)
self.pbar.attach(self.trainer, output_transform=lambda loss: {"loss": loss})
# 每次epoch后作诗一首看看结果
self.trainer.add_event_handler(
Events.EPOCH_COMPLETED,
lambda: print(generate_random_poetry(self.data_loader.get_tokenizer(), self.model, self.device))
)
def __adjust_for_loss(self, output: torch.Tensor) -> torch.Tensor:
return output.permute(0, 2, 1)
def train_model(self):
# 训练模型
self.trainer.run(self.data_loader.loader, max_epochs=settings.N_EPOCH)
def save_model(self):
# 确保保存模型的文件夹存在。
settings.SAVED_MODEL_PATH.parent.mkdir(parents=True, exist_ok=True)
# 仅保存模型参数
torch.save(self.model.state_dict(), settings.SAVED_MODEL_PATH)
print(f'Model was saved into: {settings.SAVED_MODEL_PATH}')
def main():
trainer = Trainer()
trainer.train_model()
trainer.save_model()
if __name__ == "__main__":
gpu_utils.print_gpu_availability()
main()

View File

@@ -0,0 +1,199 @@
#ANLI College of Artificial Intelligence
from collections import Counter
import math
import numpy as np
import tensorflow as tf
import settings
class Tokenizer:
"""
分词器
"""
def __init__(self, token_dict):
# 词->编号的映射
self.token_dict = token_dict
# 编号->词的映射
self.token_dict_rev = {value: key for key, value in self.token_dict.items()}
# 词汇表大小
self.vocab_size = len(self.token_dict)
def id_to_token(self, token_id):
"""
给定一个编号,查找词汇表中对应的词
:param token_id: 带查找词的编号
:return: 编号对应的词
"""
return self.token_dict_rev[token_id]
def token_to_id(self, token):
"""
给定一个词,查找它在词汇表中的编号
未找到则返回低频词[UNK]的编号
:param token: 带查找编号的词
:return: 词的编号
"""
return self.token_dict.get(token, self.token_dict['[UNK]'])
def encode(self, tokens):
"""
给定一个字符串s在头尾分别加上标记开始和结束的特殊字符并将它转成对应的编号序列
:param tokens: 待编码字符串
:return: 编号序列
"""
# 加上开始标记
token_ids = [self.token_to_id('[CLS]'), ]
# 加入字符串编号序列
for token in tokens:
token_ids.append(self.token_to_id(token))
# 加上结束标记
token_ids.append(self.token_to_id('[SEP]'))
return token_ids
def decode(self, token_ids):
"""
给定一个编号序列,将它解码成字符串
:param token_ids: 待解码的编号序列
:return: 解码出的字符串
"""
# 起止标记字符特殊处理
spec_tokens = {'[CLS]', '[SEP]'}
# 保存解码出的字符的list
tokens = []
for token_id in token_ids:
token = self.id_to_token(token_id)
if token in spec_tokens:
continue
tokens.append(token)
# 拼接字符串
return ''.join(tokens)
# 禁用词
disallowed_words = settings.DISALLOWED_WORDS
# 句子最大长度
max_len = settings.MAX_LEN
# 最小词频
min_word_frequency = settings.MIN_WORD_FREQUENCY
# mini batch 大小
batch_size = settings.BATCH_SIZE
# 加载数据集
with open(settings.DATASET_PATH, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 将冒号统一成相同格式
lines = [line.replace('', ':') for line in lines]
# 数据集列表
poetry = []
# 逐行处理读取到的数据
for line in lines:
# 有且只能有一个冒号用来分割标题
if line.count(':') != 1:
continue
# 后半部分不能包含禁止词
__, last_part = line.split(':')
ignore_flag = False
for dis_word in disallowed_words:
if dis_word in last_part:
ignore_flag = True
break
if ignore_flag:
continue
# 长度不能超过最大长度
if len(last_part) > max_len - 2:
continue
poetry.append(last_part.replace('\n', ''))
# 统计词频
counter = Counter()
for line in poetry:
counter.update(line)
# 过滤掉低频词
_tokens = [(token, count) for token, count in counter.items() if count >= min_word_frequency]
# 按词频排序
_tokens = sorted(_tokens, key=lambda x: -x[1])
# 去掉词频,只保留词列表
_tokens = [token for token, count in _tokens]
# 将特殊词和数据集中的词拼接起来
_tokens = ['[PAD]', '[UNK]', '[CLS]', '[SEP]'] + _tokens
# 创建词典 token->id映射关系
token_id_dict = dict(zip(_tokens, range(len(_tokens))))
# 使用新词典重新建立分词器
tokenizer = Tokenizer(token_id_dict)
# 混洗数据
np.random.shuffle(poetry)
class PoetryDataGenerator:
"""
古诗数据集生成器
"""
def __init__(self, data, random=False):
# 数据集
self.data = data
# batch size
self.batch_size = batch_size
# 每个epoch迭代的步数
self.steps = int(math.floor(len(self.data) / self.batch_size))
# 每个epoch开始时是否随机混洗
self.random = random
def sequence_padding(self, data, length=None, padding=None):
"""
将给定数据填充到相同长度
:param data: 待填充数据
:param length: 填充后的长度不传递此参数则使用data中的最大长度
:param padding: 用于填充的数据,不传递此参数则使用[PAD]的对应编号
:return: 填充后的数据
"""
# 计算填充长度
if length is None:
length = max(map(len, data))
# 计算填充数据
if padding is None:
padding = tokenizer.token_to_id('[PAD]')
# 开始填充
outputs = []
for line in data:
padding_length = length - len(line)
# 不足就进行填充
if padding_length > 0:
outputs.append(np.concatenate([line, [padding] * padding_length]))
# 超过就进行截断
else:
outputs.append(line[:length])
return np.array(outputs)
def __len__(self):
return self.steps
def __iter__(self):
total = len(self.data)
# 是否随机混洗
if self.random:
np.random.shuffle(self.data)
# 迭代一个epoch每次yield一个batch
for start in range(0, total, self.batch_size):
end = min(start + self.batch_size, total)
batch_data = []
# 逐一对古诗进行编码
for single_data in self.data[start:end]:
batch_data.append(tokenizer.encode(single_data))
# 填充为相同长度
batch_data = self.sequence_padding(batch_data)
# yield x,y
yield batch_data[:, :-1], tf.one_hot(batch_data[:, 1:], tokenizer.vocab_size)
del batch_data
def for_fit(self):
"""
创建一个生成器,用于训练
"""
# 死循环当数据训练一个epoch之后重新迭代数据
while True:
# 委托生成器
yield from self.__iter__()

View File

@@ -0,0 +1,16 @@
#ANLI College of Artificial Intelligence
import tensorflow as tf
from dataset import tokenizer
import settings
import utils
# 加载训练好的模型
model = tf.keras.models.load_model(settings.BEST_MODEL_PATH)
# 随机生成一首诗
print(utils.generate_random_poetry(tokenizer, model))
# 给出部分信息的情况下,随机生成剩余部分
print(utils.generate_random_poetry(tokenizer, model, s='床前明月光,'))
# 生成藏头诗
print(utils.generate_acrostic(tokenizer, model, head='好好学习天天向上'))

View File

@@ -0,0 +1,19 @@
#ANLI College of Artificial Intelligence
# 禁用词,包含如下字符的唐诗将被忽略
DISALLOWED_WORDS = ['', '', '(', ')', '__', '', '', '', '', '[', ']']
# 句子最大长度
MAX_LEN = 64
# 最小词频
MIN_WORD_FREQUENCY = 8
# 训练的batch size
BATCH_SIZE = 16
# 数据集路径
DATASET_PATH = './poetry.txt'
# 每个epoch训练完成后随机生成SHOW_NUM首古诗作为展示
SHOW_NUM = 5
# 共训练多少个epoch
TRAIN_EPOCHS = 10
# 最佳权重保存路径
BEST_MODEL_PATH = './best_model.h5'

View File

@@ -0,0 +1,35 @@
import tensorflow as tf
from dataset import PoetryDataGenerator, tokenizer, poetry
import settings
import utils
model = tf.keras.Sequential([
tf.keras.layers.Input((None,)),
tf.keras.layers.Embedding(input_dim=tokenizer.vocab_size, output_dim=128),
tf.keras.layers.LSTM(128, dropout=0.5, return_sequences=True),
tf.keras.layers.LSTM(128, dropout=0.5, return_sequences=True),
tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(tokenizer.vocab_size, activation='softmax')),
])
model.summary()
model.compile(optimizer=tf.keras.optimizers.Adam(), loss=tf.keras.losses.categorical_crossentropy)
class Evaluate(tf.keras.callbacks.Callback):
def __init__(self):
super().__init__()
self.lowest = 1e10
def on_epoch_end(self, epoch, logs=None):
if logs['loss'] <= self.lowest:
self.lowest = logs['loss']
model.save(settings.BEST_MODEL_PATH)
print()
for i in range(settings.SHOW_NUM):
print(utils.generate_random_poetry(tokenizer, model))
data_generator = PoetryDataGenerator(poetry, random=False)
model.fit_generator(data_generator.for_fit(),
steps_per_epoch=data_generator.steps,
epochs=settings.TRAIN_EPOCHS,
callbacks=[Evaluate()])

View File

@@ -0,0 +1,86 @@
import numpy as np
import settings
def generate_random_poetry(tokenizer, model, s=''):
"""
随机生成一首诗
:param tokenizer: 分词器
:param model: 用于生成古诗的模型
:param s: 用于生成古诗的起始字符串,默认为空串
:return: 一个字符串,表示一首古诗
"""
# 将初始字符串转成token
token_ids = tokenizer.encode(s)
# 去掉结束标记[SEP]
token_ids = token_ids[:-1]
while len(token_ids) < settings.MAX_LEN:
# 进行预测只保留第一个样例我们输入的样例数只有1的、最后一个token的预测的、不包含[PAD][UNK][CLS]的概率分布
output = model(np.array([token_ids, ], dtype=np.int32))
_probas = output.numpy()[0, -1, 3:]
del output
# print(_probas)
# 按照出现概率对所有token倒序排列
p_args = _probas.argsort()[::-1][:100]
# 排列后的概率顺序
p = _probas[p_args]
# 先对概率归一
p = p / sum(p)
# 再按照预测出的概率,随机选择一个词作为预测结果
target_index = np.random.choice(len(p), p=p)
target = p_args[target_index] + 3
# 保存
token_ids.append(target)
if target == 3:
break
return tokenizer.decode(token_ids)
def generate_acrostic(tokenizer, model, head):
"""
随机生成一首藏头诗
:param tokenizer: 分词器
:param model: 用于生成古诗的模型
:param head: 藏头诗的头
:return: 一个字符串,表示一首古诗
"""
# 使用空串初始化token_ids加入[CLS]
token_ids = tokenizer.encode('')
token_ids = token_ids[:-1]
# 标点符号,这里简单的只把逗号和句号作为标点
punctuations = ['', '']
punctuation_ids = {tokenizer.token_to_id(token) for token in punctuations}
# 缓存生成的诗的list
poetry = []
# 对于藏头诗中的每一个字,都生成一个短句
for ch in head:
# 先记录下这个字
poetry.append(ch)
# 将藏头诗的字符转成token id
token_id = tokenizer.token_to_id(ch)
# 加入到列表中去
token_ids.append(token_id)
# 开始生成一个短句
while True:
# 进行预测只保留第一个样例我们输入的样例数只有1的、最后一个token的预测的、不包含[PAD][UNK][CLS]的概率分布
output = model(np.array([token_ids, ], dtype=np.int32))
_probas = output.numpy()[0, -1, 3:]
del output
# 按照出现概率对所有token倒序排列
p_args = _probas.argsort()[::-1][:100]
# 排列后的概率顺序
p = _probas[p_args]
# 先对概率归一
p = p / sum(p)
# 再按照预测出的概率,随机选择一个词作为预测结果
target_index = np.random.choice(len(p), p=p)
target = p_args[target_index] + 3
# 保存
token_ids.append(target)
# 只有不是特殊字符时才保存到poetry里面去
if target > 3:
poetry.append(tokenizer.id_to_token(target))
if target in punctuation_ids:
break
return ''.join(poetry)

17
dl-exp/gpu_utils.py Normal file
View File

@@ -0,0 +1,17 @@
import torch
def print_gpu_availability():
"""打印PyTorch的GPU可用性"""
if torch.cuda.is_available():
print(f"GPU可用{torch.cuda.get_device_name(0)}")
else:
print("GPU不可用")
def get_gpu_device() -> torch.device:
"""获取PyTorch的GPU设备"""
if torch.cuda.is_available():
return torch.device("cuda")
else:
raise Exception("找不到CUDA")

29
dl-exp/pyproject.toml Normal file
View File

@@ -0,0 +1,29 @@
[project]
name = "dlexperiment"
version = "0.1.0"
description = "The code for deep learning experiment course."
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"datasets>=4.3.0",
"matplotlib>=3.10.7",
"numpy>=2.3.4",
"pillow>=12.0.0",
"pytorch-ignite>=0.5.3",
"torch>=2.9.0",
"torchinfo>=1.8.0",
"torchvision>=0.24.0",
]
[tool.uv.sources]
torch = [
{ index = "pytorch-cu126", marker = "sys_platform == 'linux' or sys_platform == 'win32'" },
]
torchvision = [
{ index = "pytorch-cu126", marker = "sys_platform == 'linux' or sys_platform == 'win32'" },
]
[[tool.uv.index]]
name = "pytorch-cu126"
url = "https://download.pytorch.org/whl/cu126"
explicit = true

2243
dl-exp/uv.lock generated Normal file

File diff suppressed because it is too large Load Diff