

图像压缩是数字图像处理领域的核心技术之一,小到手机拍照存储、微信发图,大到视频监控、卫星图像传输,都离不开图像压缩技术的支撑。本文将系统讲解图像压缩的基础理论、常用压缩方法及数字图像水印技术,并通过可直接运行的 Python 代码 + 直观的效果对比图,让你从零掌握图像压缩的核心知识。
编码冗余是指对图像数据进行编码时,使用了过多的比特数来表示相同或相似的信息。比如用 8 位(0-255)表示一个灰度值,但实际图像中大部分灰度值出现频率极低,这种 “一刀切” 的编码方式就造成了冗余。

空间冗余:图像中相邻像素往往具有相似的灰度值(比如蓝天、白墙区域),这种像素间的相关性就是空间冗余。
时间冗余:视频序列中相邻帧的内容高度相似(比如静态场景的视频),帧间的相关性就是时间冗余。
无关信息是指人眼无法感知或感知不敏感的信息,比如超出人眼视觉阈值的高频细节、人眼对亮度敏感但对色度不敏感的特性,这些信息可以在压缩时去除。
图像信息的度量核心是熵(Entropy),熵代表了表示信源所需的最小平均比特数,是图像压缩的理论下限。

图像压缩分为无损压缩(解压后与原图完全一致)和有损压缩(解压后与原图有误差),保真度准则用于衡量压缩后图像的失真程度:

图像压缩模型分为编码(压缩)和解码(解压)两部分,核心流程如下:


霍夫曼编码是无损压缩的经典算法,核心思想:对出现频率高的灰度值分配短编码,频率低的分配长编码。
import cv2
import numpy as np
import matplotlib.pyplot as plt
import heapq
from collections import defaultdict
import pickle
# 设置matplotlib支持中文显示
plt.rcParams['font.sans-serif'] = ['SimHei'] # 黑体
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# ===================== 霍夫曼编码核心类 =====================
class HuffmanNode:
def __init__(self, prob, symbol=None, left=None, right=None):
self.prob = prob # 概率
self.symbol = symbol# 符号(灰度值)
self.left = left # 左子节点
self.right = right # 右子节点
# 重载比较运算符,用于堆排序
def __lt__(self, other):
return self.prob < other.prob
# 生成霍夫曼编码表
def build_huffman_code(node, code="", code_dict=None):
if code_dict is None:
code_dict = {}
if node.symbol is not None:
code_dict[node.symbol] = code
return code_dict
build_huffman_code(node.left, code + "0", code_dict)
build_huffman_code(node.right, code + "1", code_dict)
return code_dict
# 霍夫曼编码
def huffman_encode(image):
# 统计每个灰度值的出现频率
flat_img = image.flatten()
prob_dict = defaultdict(float)
total_pixels = len(flat_img)
for pixel in flat_img:
prob_dict[pixel] += 1.0 / total_pixels
# 构建霍夫曼树
heap = [HuffmanNode(prob, sym) for sym, prob in prob_dict.items()]
heapq.heapify(heap)
while len(heap) > 1:
left = heapq.heappop(heap)
right = heapq.heappop(heap)
merged = HuffmanNode(left.prob + right.prob, left=left, right=right)
heapq.heappush(heap, merged)
# 生成编码表
root = heapq.heappop(heap)
code_dict = build_huffman_code(root)
# 对图像进行编码
encoded_str = "".join([code_dict[pixel] for pixel in flat_img])
# 计算压缩比
original_bits = total_pixels * 8 # 原始8位/像素
compressed_bits = len(encoded_str)
compression_ratio = original_bits / compressed_bits
return encoded_str, code_dict, compression_ratio
# 霍夫曼解码
def huffman_decode(encoded_str, code_dict, img_shape):
# 构建反向编码表(编码->灰度值)
reverse_code_dict = {v: k for k, v in code_dict.items()}
decoded_pixels = []
current_code = ""
# 逐位解码
for bit in encoded_str:
current_code += bit
if current_code in reverse_code_dict:
decoded_pixels.append(reverse_code_dict[current_code])
current_code = ""
# 恢复图像形状
decoded_img = np.array(decoded_pixels).reshape(img_shape)
return decoded_img
# ===================== 主函数:霍夫曼编码实战 =====================
if __name__ == "__main__":
# 1. 读取图像(转为灰度图)
img = cv2.imread("test_img.jpg", cv2.IMREAD_GRAYSCALE)
if img is None:
print("请确保当前目录下有test_img.jpg文件!")
exit()
# 2. 霍夫曼编码
encoded_str, code_dict, compression_ratio = huffman_encode(img)
print(f"霍夫曼编码压缩比:{compression_ratio:.2f}")
# 3. 霍夫曼解码
decoded_img = huffman_decode(encoded_str, code_dict, img.shape)
# 4. 计算PSNR(验证无损)
mse = np.mean((img - decoded_img) ** 2)
psnr = 10 * np.log10((255 ** 2) / mse)
print(f"解码后图像PSNR:{psnr:.2f} dB(PSNR无穷大表示完全无损)")
# 5. 可视化对比
plt.figure(figsize=(12, 6))
# 原图
plt.subplot(1, 2, 1)
plt.imshow(img, cmap="gray")
plt.title("原始图像")
plt.axis("off")
# 解码后图像
plt.subplot(1, 2, 2)
plt.imshow(decoded_img, cmap="gray")
plt.title(f"霍夫曼解码后图像(压缩比:{compression_ratio:.2f})")
plt.axis("off")
plt.tight_layout()
plt.show()
HuffmanNode:定义霍夫曼树节点,包含概率、符号、左右子节点;build_huffman_code:递归生成霍夫曼编码表;huffman_encode:统计灰度值频率→构建霍夫曼树→生成编码→计算压缩比;huffman_decode:反向解码恢复图像;戈伦布编码适用于非负整数的压缩,核心是将数字分为 “商” 和 “余数” 两部分编码,对小数值压缩效率极高。
import cv2
import numpy as np
import matplotlib.pyplot as plt
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 戈伦布编码(参数m可自定义,优化m的计算逻辑)
def golomb_encode(num, m=16):
if num < 0:
raise ValueError("戈伦布编码仅支持非负整数")
q = num // m # 商
r = num % m # 余数
k = m.bit_length() - 1 # 2^k ≤ m < 2^(k+1)
# 商:q个1 + 1个0(戈伦布编码核心)
q_code = "1" * q + "0"
# 余数:k位二进制(不足补0)
r_code = bin(r)[2:].zfill(k)
return q_code + r_code
# 戈伦布解码
def golomb_decode(code, m=16):
# 分割商和余数
q = 0
i = 0
# 统计连续的1的个数(商)
while i < len(code) and code[i] == "1":
q += 1
i += 1
# 跳过分隔符0(如果已经到末尾,直接返回)
if i >= len(code):
return q * m
i += 1 # 跳过0
k = m.bit_length() - 1
# 提取余数部分(避免越界)
r_code = code[i:i + k] if (i + k) <= len(code) else code[i:]
r = int(r_code, 2) if r_code else 0
return q * m + r
# 图像戈伦布编码压缩(优化编码效率)
def image_golomb_compress(image, m=16):
flat_img = image.flatten()
encoded_str = ""
for pixel in flat_img:
encoded_str += golomb_encode(pixel, m)
# 计算压缩比
original_bits = len(flat_img) * 8 # 原始8位/像素
compressed_bits = len(encoded_str)
compression_ratio = original_bits / compressed_bits
# 统计编码效率
avg_bits_per_pixel = compressed_bits / len(flat_img)
return encoded_str, compression_ratio, avg_bits_per_pixel
# 图像戈伦布解码(优化边界处理)
def image_golomb_decompress(encoded_str, img_shape, m=16):
decoded_pixels = []
current_pos = 0
total_pixels = img_shape[0] * img_shape[1]
while current_pos < len(encoded_str) and len(decoded_pixels) < total_pixels:
# 解码商
q = 0
while current_pos < len(encoded_str) and encoded_str[current_pos] == "1":
q += 1
current_pos += 1
# 跳过0(如果到末尾,停止解码)
if current_pos >= len(encoded_str):
break
current_pos += 1 # 跳过0
# 解码余数
k = m.bit_length() - 1
if current_pos + k > len(encoded_str):
r_code = encoded_str[current_pos:]
current_pos = len(encoded_str)
else:
r_code = encoded_str[current_pos:current_pos + k]
current_pos += k
r = int(r_code, 2) if r_code else 0
decoded_pixels.append(q * m + r)
# 确保解码后的像素数和原图一致(补零)
if len(decoded_pixels) < total_pixels:
decoded_pixels += [0] * (total_pixels - len(decoded_pixels))
return np.array(decoded_pixels).reshape(img_shape)
# 自动选择最优m值(基于图像灰度分布)
def select_optimal_m(image):
# 统计图像灰度的均值,选择接近2的幂次的m
mean_gray = np.mean(image)
m = 2 ** (int(np.log2(mean_gray)) if mean_gray > 0 else 3)
# 限制m的范围(避免过小/过大)
return max(4, min(64, m))
# 主函数
if __name__ == "__main__":
# 读取灰度图像(替换为你的图像路径)
img = cv2.imread("../picture/HuTao.png", cv2.IMREAD_GRAYSCALE)
if img is None:
print("图像读取失败!请检查路径是否正确,或替换为存在的图像文件。")
exit()
# 自动选择最优m值(核心优化点)
optimal_m = select_optimal_m(img)
print(f"根据图像灰度分布,自动选择最优m值:{optimal_m}")
# 戈伦布编码压缩(使用最优m)
encoded_str, compression_ratio, avg_bits = image_golomb_compress(img, m=optimal_m)
print(f"戈伦布编码压缩比:{compression_ratio:.2f}")
print(f"平均每像素编码比特数:{avg_bits:.2f}(原始8位)")
# 戈伦布解码
decoded_img = image_golomb_decompress(encoded_str, img.shape, m=optimal_m)
# 计算PSNR(处理MSE=0的情况)
mse = np.mean((img - decoded_img) ** 2)
if mse == 0:
psnr = "∞(无损压缩)"
else:
psnr = f"{10 * np.log10((255 ** 2) / mse):.2f} dB"
print(f"解码后PSNR:{psnr}")
# 可视化对比
plt.figure(figsize=(12, 6))
# 原图
plt.subplot(1, 2, 1)
plt.imshow(img, cmap="gray")
plt.title("原始图像")
plt.axis("off")
# 解码后图像
plt.subplot(1, 2, 2)
plt.imshow(decoded_img, cmap="gray")
plt.title(f"戈伦布解码后图像(压缩比:{compression_ratio:.2f})")
plt.axis("off")
plt.tight_layout()
plt.show()
算术编码是一种无损压缩算法,核心是将整个数据序列映射到 [0,1) 区间的一个小数,相比霍夫曼编码(最小编码长度 1 位),算术编码可以实现小于 1 位的编码长度,压缩效率更高。
import cv2
import numpy as np
import matplotlib.pyplot as plt
import time
from numba import njit # 关键:导入Numba即时编译
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# -------------------------- 核心加速:Numba编译的编码函数 --------------------------
@njit(fastmath=True, cache=True) # fastmath加速,cache缓存编译结果
def numba_encode_core(flat_img, sorted_gray, cum_prob, precision):
"""
Numba编译的核心编码循环(机器码执行,无Python开销)
"""
scale = 2 ** precision
low = 0
high = scale - 1
# 预生成灰度值到索引的映射(Numba支持数组索引)
gray_to_idx = np.zeros(256, dtype=np.int32) # 灰度值范围0-255
for i in range(len(sorted_gray)):
gray_to_idx[sorted_gray[i]] = i
for pixel in flat_img:
idx = gray_to_idx[pixel]
range_width = high - low + 1
# 区间更新
new_low = low + int(range_width * cum_prob[idx])
new_high = low + int(range_width * cum_prob[idx + 1]) - 1
low, high = new_low, new_high
# 归一化(位运算加速)
while (low >= scale // 2) or (high < scale // 2):
if low >= scale // 2:
low -= scale // 2
high -= scale // 2
else:
low <<= 1
high = (high << 1) + 1
# 防止溢出
low = min(max(low, 0), scale - 1)
high = min(max(high, 0), scale - 1)
encoded_int = (low + high) // 2
return encoded_int
# -------------------------- Numba编译的解码函数 --------------------------
@njit(fastmath=True, cache=True)
def numba_decode_core(encoded_int, sorted_gray, cum_prob, total_pixels, precision):
"""
Numba编译的核心解码循环
"""
scale = 2 ** precision
low = 0
high = scale - 1
decoded_pixels = np.zeros(total_pixels, dtype=np.uint8)
for i in range(total_pixels):
range_width = high - low + 1
if range_width == 0:
range_width = 1
# 快速匹配灰度值(二分查找)
val = (encoded_int - low) / range_width
idx = np.searchsorted(cum_prob, val) - 1
idx = max(min(idx, len(sorted_gray) - 1), 0)
decoded_pixels[i] = sorted_gray[idx]
# 区间更新
new_low = low + int(range_width * cum_prob[idx])
new_high = low + int(range_width * cum_prob[idx + 1]) - 1
low, high = new_low, new_high
# 归一化
while (low >= scale // 2) or (high < scale // 2):
if low >= scale // 2:
low -= scale // 2
high -= scale // 2
encoded_int -= scale // 2
else:
low <<= 1
high = (high << 1) + 1
encoded_int = (encoded_int << 1) & (scale - 1)
low = min(max(low, 0), scale - 1)
high = min(max(high, 0), scale - 1)
return decoded_pixels
# -------------------------- 外层封装函数 --------------------------
def arithmetic_encode_numba(image, precision=24):
flat_img = image.flatten().astype(np.uint8)
total_pixels = len(flat_img)
# 1. 统计概率(仅一次,向量化)
unique_gray, counts = np.unique(flat_img, return_counts=True)
prob = counts / total_pixels
sorted_gray = np.sort(unique_gray)
# 2. 预计算累积概率(长度=len(sorted_gray)+1)
cum_prob = np.zeros(len(sorted_gray) + 1, dtype=np.float64)
for i in range(1, len(cum_prob)):
cum_prob[i] = cum_prob[i - 1] + prob[i - 1]
# 3. 调用Numba编译的核心函数(毫秒级执行)
encoded_int = numba_encode_core(flat_img, sorted_gray, cum_prob, precision)
encoded_str = bin(encoded_int)[2:].zfill(precision)
# 计算压缩比
original_bits = total_pixels * 8
compressed_bits = len(encoded_str)
compression_ratio = original_bits / compressed_bits if compressed_bits > 0 else 0
return encoded_str, cum_prob, sorted_gray, compression_ratio
def arithmetic_decode_numba(encoded_str, cum_prob, sorted_gray, img_shape, precision=24):
total_pixels = img_shape[0] * img_shape[1]
encoded_int = int(encoded_str, 2) if encoded_str else 0
# 调用Numba编译的解码函数
decoded_pixels = numba_decode_core(encoded_int, sorted_gray, cum_prob, total_pixels, precision)
return decoded_pixels.reshape(img_shape).astype(np.uint8)
# -------------------------- 主函数(带计时对比) --------------------------
if __name__ == "__main__":
# 1. 读取图像(测试512x512大尺寸)
img = cv2.imread("../picture/AALi.jpg", cv2.IMREAD_GRAYSCALE)
if img is None:
print("图像读取失败!检查路径")
exit()
img = cv2.resize(img, (512, 512)) # 大尺寸测试效率
print(f"测试图像尺寸:{img.shape},总像素数:{img.size / 10000:.2f}万")
# 2. Numba加速编码
start = time.time()
encoded_str, cum_prob, sorted_gray, cr = arithmetic_encode_numba(img, precision=24)
encode_time = time.time() - start
print(f"\nNumba加速编码耗时:{encode_time * 1000:.2f} 毫秒")
print(f"压缩比:{cr:.2f}")
# 3. Numba加速解码
start = time.time()
decoded_img = arithmetic_decode_numba(encoded_str, cum_prob, sorted_gray, img.shape, precision=24)
decode_time = time.time() - start
print(f"Numba加速解码耗时:{decode_time * 1000:.2f} 毫秒")
# 4. 验证无损压缩
mse = np.mean((img - decoded_img) ** 2)
psnr = "∞(无损)" if mse == 0 else f"{10 * np.log10(255 ** 2 / mse):.2f} dB"
print(f"解码PSNR:{psnr}")
# 5. 可视化对比
plt.figure(figsize=(12, 10))
plt.subplot(1, 2, 1)
plt.imshow(img, cmap="gray")
plt.title("原始图像(512x512)")
plt.axis("off")
plt.subplot(1, 2, 2)
plt.imshow(decoded_img, cmap="gray")
plt.title(f"Numba加速解码(压缩比{cr:.2f})")
plt.axis("off")
plt.tight_layout()
plt.show()
LZW 编码是无损压缩的经典算法,广泛应用于 GIF、PNG 格式,核心思想:动态构建字典,将重复的字符序列映射为单个编码。
import cv2
import numpy as np
import matplotlib.pyplot as plt
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# LZW编码
def lzw_encode(image):
flat_img = image.flatten().tolist()
# 初始化字典:灰度值0-255对应编码0-255
dict_size = 256
lzw_dict = {tuple([i]): i for i in range(dict_size)}
current_seq = [flat_img[0]]
encoded = []
# 逐像素编码
for pixel in flat_img[1:]:
if tuple(current_seq + [pixel]) in lzw_dict:
current_seq.append(pixel)
else:
# 输出当前序列的编码
encoded.append(lzw_dict[tuple(current_seq)])
# 添加新序列到字典
lzw_dict[tuple(current_seq + [pixel])] = dict_size
dict_size += 1
current_seq = [pixel]
# 输出最后一个序列
encoded.append(lzw_dict[tuple(current_seq)])
# 计算压缩比
original_bits = len(flat_img) * 8
# 编码值按12位存储(LZW标准)
compressed_bits = len(encoded) * 12
compression_ratio = original_bits / compressed_bits
return encoded, lzw_dict, compression_ratio
# LZW解码
def lzw_decode(encoded, img_shape):
# 初始化反向字典
dict_size = 256
lzw_dict = {i: [i] for i in range(dict_size)}
current_code = encoded[0]
decoded = lzw_dict[current_code]
result = decoded.copy()
# 逐编码解码
for code in encoded[1:]:
if code in lzw_dict:
entry = lzw_dict[code]
elif code == dict_size:
entry = decoded + [decoded[0]]
else:
raise ValueError("无效的LZW编码")
result.extend(entry)
# 添加新条目到字典
lzw_dict[dict_size] = decoded + [entry[0]]
dict_size += 1
decoded = entry
# 截断到图像尺寸(避免长度不一致)
result = result[:img_shape[0]*img_shape[1]]
return np.array(result).reshape(img_shape)
# 主函数
if __name__ == "__main__":
# 读取灰度图像
img = cv2.imread("test_img.jpg", cv2.IMREAD_GRAYSCALE)
if img is None:
print("请确保当前目录下有test_img.jpg文件!")
exit()
# LZW编码
encoded, lzw_dict, compression_ratio = lzw_encode(img)
print(f"LZW编码压缩比:{compression_ratio:.2f}")
# LZW解码
decoded_img = lzw_decode(encoded, img.shape)
# 计算PSNR
mse = np.mean((img - decoded_img) ** 2)
psnr = 10 * np.log10((255 ** 2) / mse)
print(f"解码后PSNR:{psnr:.2f} dB")
# 可视化对比
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.imshow(img, cmap="gray")
plt.title("原始图像")
plt.axis("off")
plt.subplot(1, 2, 2)
plt.imshow(decoded_img, cmap="gray")
plt.title(f"LZW解码后图像(压缩比:{compression_ratio:.2f})")
plt.axis("off")
plt.tight_layout()
plt.show()
行程编码适用于有大量连续重复像素的图像(如二值图像、卡通图像),核心:将连续的相同像素表示为 “长度 + 值”。
import cv2
import numpy as np
import matplotlib.pyplot as plt
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 行程编码
def rle_encode(image):
flat_img = image.flatten()
encoded = []
if len(flat_img) == 0:
return encoded
current_pixel = flat_img[0]
count = 1
for pixel in flat_img[1:]:
if pixel == current_pixel:
count += 1
else:
encoded.append((current_pixel, count))
current_pixel = pixel
count = 1
# 添加最后一个序列
encoded.append((current_pixel, count))
# 计算压缩比
original_bits = len(flat_img) * 8
# 每个编码对(值+长度)按16位存储
compressed_bits = len(encoded) * 16
compression_ratio = original_bits / compressed_bits
return encoded, compression_ratio
# 行程解码
def rle_decode(encoded, img_shape):
flat_img = []
for pixel, count in encoded:
flat_img.extend([pixel] * count)
# 截断到图像尺寸
flat_img = flat_img[:img_shape[0]*img_shape[1]]
return np.array(flat_img).reshape(img_shape)
# 主函数
if __name__ == "__main__":
# 读取图像(二值图像效果更佳)
img = cv2.imread("test_img.jpg", cv2.IMREAD_GRAYSCALE)
# 转为二值图像(增强RLE效果)
_, binary_img = cv2.threshold(img, 127, 255, cv2.THRESH_BINARY)
if img is None:
print("请确保当前目录下有test_img.jpg文件!")
exit()
# RLE编码
encoded, compression_ratio = rle_encode(binary_img)
print(f"行程编码压缩比:{compression_ratio:.2f}")
# RLE解码
decoded_img = rle_decode(encoded, binary_img.shape)
# 可视化对比
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.imshow(binary_img, cmap="gray")
plt.title("原始二值图像")
plt.axis("off")
plt.subplot(1, 2, 2)
plt.imshow(decoded_img, cmap="gray")
plt.title(f"RLE解码后图像(压缩比:{compression_ratio:.2f})")
plt.axis("off")
plt.tight_layout()
plt.show()
基于符号的编码是泛称,核心是将图像中的重复 “符号”(像素块、特征)映射为编码,本质是行程编码、LZW 编码的扩展,代码可参考 LZW 编码(将单个像素改为像素块即可)。
位平面编码将图像的每个像素的二进制位拆分到不同的位平面(如 8 位灰度图拆分为 8 个位平面),对重要的低位平面保留,高位平面压缩 / 去除。
import cv2
import numpy as np
import matplotlib.pyplot as plt
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 拆分位平面
def split_bit_planes(image):
bit_planes = []
for i in range(8):
# 提取第i位(0为最低位,7为最高位)
bit_plane = (image >> i) & 1
bit_planes.append(bit_plane * 255) # 转为0-255便于显示
return bit_planes
# 合并位平面
def merge_bit_planes(bit_planes, keep_planes=4):
# 只保留前keep_planes个位平面(高位)
merged = np.zeros_like(bit_planes[0], dtype=np.uint8)
for i in range(keep_planes):
merged += (bit_planes[7-i] // 255) << (7-i)
return merged
# 主函数
if __name__ == "__main__":
# 读取灰度图像
img = cv2.imread("test_img.jpg", cv2.IMREAD_GRAYSCALE)
if img is None:
print("请确保当前目录下有test_img.jpg文件!")
exit()
# 拆分位平面
bit_planes = split_bit_planes(img)
# 合并位平面(保留4个位平面)
merged_img = merge_bit_planes(bit_planes, keep_planes=4)
# 计算压缩比(保留4位,压缩比=8/4=2)
compression_ratio = 8 / 4
print(f"位平面编码压缩比:{compression_ratio:.2f}")
# 计算PSNR
mse = np.mean((img - merged_img) ** 2)
psnr = 10 * np.log10((255 ** 2) / mse)
print(f"合并后PSNR:{psnr:.2f} dB")
# 可视化
plt.figure(figsize=(15, 8))
# 原图
plt.subplot(1, 3, 1)
plt.imshow(img, cmap="gray")
plt.title("原始图像")
plt.axis("off")
# 最高位平面(第7位)
plt.subplot(1, 3, 2)
plt.imshow(bit_planes[7], cmap="gray")
plt.title("最高位平面(第7位)")
plt.axis("off")
# 合并后图像
plt.subplot(1, 3, 3)
plt.imshow(merged_img, cmap="gray")
plt.title(f"保留4个位平面(压缩比:{compression_ratio:.2f},PSNR:{psnr:.2f}dB)")
plt.axis("off")
plt.tight_layout()
plt.show()
块变换编码是 JPEG 的核心,常用 DCT(离散余弦变换)将图像从空间域转为频率域,对高频分量(细节)量化压缩。
import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy.fftpack import dct, idct
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 8x8 DCT变换
def dct_2d(block):
return dct(dct(block.T, norm='ortho').T, norm='ortho')
# 8x8 IDCT变换
def idct_2d(block):
return idct(idct(block.T, norm='ortho').T, norm='ortho')
# JPEG风格DCT压缩
def jpeg_dct_compress(image, quality=50):
# 质量因子转量化矩阵系数
if quality < 1:
quality = 1
if quality > 100:
quality = 100
scale = 50 / quality if quality < 50 else 2 - quality / 50
# JPEG标准亮度量化矩阵
quant_matrix = np.array([
[16, 11, 10, 16, 24, 40, 51, 61],
[12, 12, 14, 19, 26, 58, 60, 55],
[14, 13, 16, 24, 40, 57, 69, 56],
[14, 17, 22, 29, 51, 87, 80, 62],
[18, 22, 37, 56, 68, 109, 103, 77],
[24, 35, 55, 64, 81, 104, 113, 92],
[49, 64, 78, 87, 103, 121, 120, 101],
[72, 92, 95, 98, 112, 100, 103, 99]
]) * scale
# 图像补零到8的倍数
h, w = image.shape
h_pad = (8 - h % 8) % 8
w_pad = (8 - w % 8) % 8
img_padded = np.pad(image, ((0, h_pad), (0, w_pad)), mode='constant')
h_new, w_new = img_padded.shape
# 分块处理
compressed = np.zeros_like(img_padded, dtype=np.float32)
for i in range(0, h_new, 8):
for j in range(0, w_new, 8):
block = img_padded[i:i+8, j:j+8].astype(np.float32) - 128 # 中心化
dct_block = dct_2d(block) # DCT变换
quant_block = np.round(dct_block / quant_matrix) # 量化
compressed[i:i+8, j:j+8] = quant_block
# 计算压缩比(非精确,仅参考)
non_zero = np.count_nonzero(compressed)
total = h_new * w_new
compression_ratio = total / non_zero
return compressed, quant_matrix, compression_ratio, (h, w)
# DCT解压
def jpeg_dct_decompress(compressed, quant_matrix, original_shape):
h_new, w_new = compressed.shape
decompressed = np.zeros_like(compressed, dtype=np.float32)
# 分块逆处理
for i in range(0, h_new, 8):
for j in range(0, w_new, 8):
quant_block = compressed[i:i+8, j:j+8]
dct_block = quant_block * quant_matrix # 逆量化
block = idct_2d(dct_block) + 128 # IDCT变换+反中心化
decompressed[i:i+8, j:j+8] = block
# 裁剪回原始尺寸
decompressed = np.clip(decompressed, 0, 255) # 限制范围
decompressed = decompressed[:original_shape[0], :original_shape[1]].astype(np.uint8)
return decompressed
# 主函数
if __name__ == "__main__":
# 读取灰度图像
img = cv2.imread("test_img.jpg", cv2.IMREAD_GRAYSCALE)
if img is None:
print("请确保当前目录下有test_img.jpg文件!")
exit()
# DCT压缩(质量因子50)
compressed, quant_matrix, compression_ratio, original_shape = jpeg_dct_compress(img, quality=50)
print(f"DCT编码压缩比:{compression_ratio:.2f}")
# DCT解压
decoded_img = jpeg_dct_decompress(compressed, quant_matrix, original_shape)
# 计算PSNR
mse = np.mean((img - decoded_img) ** 2)
psnr = 10 * np.log10((255 ** 2) / mse)
print(f"解压后PSNR:{psnr:.2f} dB")
# 可视化对比
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.imshow(img, cmap="gray")
plt.title("原始图像")
plt.axis("off")
plt.subplot(1, 2, 2)
plt.imshow(decoded_img, cmap="gray")
plt.title(f"DCT解压后图像(质量50,压缩比:{compression_ratio:.2f},PSNR:{psnr:.2f}dB)")
plt.axis("off")
plt.tight_layout()
plt.show()
预测编码基于 “相邻像素相似” 的特性,用已编码像素预测当前像素,仅编码预测误差,核心是 DPCM(差分脉冲编码调制)。
import cv2
import numpy as np
import matplotlib.pyplot as plt
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# DPCM预测编码(左邻域预测)
def dpcm_encode(image):
h, w = image.shape
encoded = np.zeros_like(image, dtype=np.int16)
# 第一行直接编码
encoded[0, :] = image[0, :]
# 逐行预测(预测值=左边像素)
for i in range(1, h):
encoded[i, 0] = image[i, 0] # 第一列直接编码
for j in range(1, w):
predict = image[i, j-1] # 左邻域预测
error = image[i, j] - predict # 预测误差
encoded[i, j] = error
# 计算压缩比(误差范围更小,编码比特数减少)
error_range = np.max(np.abs(encoded))
bits_per_pixel = np.ceil(np.log2(error_range + 1)) if error_range > 0 else 1
compression_ratio = 8 / bits_per_pixel
return encoded, compression_ratio
# DPCM解码
def dpcm_decode(encoded):
h, w = encoded.shape
decoded = np.zeros_like(encoded, dtype=np.uint8)
# 第一行直接解码
decoded[0, :] = encoded[0, :]
# 逐行恢复
for i in range(1, h):
decoded[i, 0] = encoded[i, 0]
for j in range(1, w):
predict = decoded[i, j-1]
decoded[i, j] = predict + encoded[i, j]
# 限制范围
decoded = np.clip(decoded, 0, 255)
return decoded
# 主函数
if __name__ == "__main__":
# 读取灰度图像
img = cv2.imread("test_img.jpg", cv2.IMREAD_GRAYSCALE)
if img is None:
print("请确保当前目录下有test_img.jpg文件!")
exit()
# DPCM编码
encoded, compression_ratio = dpcm_encode(img)
print(f"DPCM编码压缩比:{compression_ratio:.2f}")
# DPCM解码
decoded_img = dpcm_decode(encoded)
# 计算PSNR
mse = np.mean((img - decoded_img) ** 2)
psnr = 10 * np.log10((255 ** 2) / mse)
print(f"解码后PSNR:{psnr:.2f} dB")
# 可视化对比
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.imshow(img, cmap="gray")
plt.title("原始图像")
plt.axis("off")
plt.subplot(1, 2, 2)
plt.imshow(decoded_img, cmap="gray")
plt.title(f"DPCM解码后图像(压缩比:{compression_ratio:.2f},PSNR:{psnr:.2f}dB)")
plt.axis("off")
plt.tight_layout()
plt.show()
小波编码是 JPEG2000 的核心,相比 DCT(块效应),小波变换具有多分辨率特性,压缩效果更好。
import cv2
import numpy as np
import matplotlib.pyplot as plt
import pywt # 需要安装:pip install PyWavelets
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 小波编码压缩
def wavelet_compress(image, level=2, threshold=20):
# 小波变换(db1小波,2级分解)
coeffs = pywt.wavedec2(image, 'db1', level=level)
# 阈值量化(去除小系数)
coeffs_quant = list(coeffs)
# 对细节系数阈值处理
for i in range(1, len(coeffs_quant)):
coeffs_quant[i] = tuple(np.where(np.abs(c) < threshold, 0, c) for c in coeffs_quant[i])
# 计算压缩比
all_coeffs = np.concatenate([np.ravel(c) for c in coeffs])
quant_coeffs = np.concatenate([np.ravel(c) for c in coeffs_quant])
non_zero = np.count_nonzero(quant_coeffs)
compression_ratio = len(all_coeffs) / non_zero
return coeffs_quant, compression_ratio
# 小波解码
def wavelet_decompress(coeffs_quant, level=2):
# 逆小波变换
decoded_img = pywt.waverec2(coeffs_quant, 'db1')
# 限制范围并转为uint8
decoded_img = np.clip(decoded_img, 0, 255).astype(np.uint8)
return decoded_img
# 主函数
if __name__ == "__main__":
# 读取灰度图像
img = cv2.imread("test_img.jpg", cv2.IMREAD_GRAYSCALE)
if img is None:
print("请确保当前目录下有test_img.jpg文件!")
exit()
# 小波压缩
coeffs_quant, compression_ratio = wavelet_compress(img, level=2, threshold=20)
print(f"小波编码压缩比:{compression_ratio:.2f}")
# 小波解码
decoded_img = wavelet_decompress(coeffs_quant, level=2)
# 裁剪到原始尺寸(小波变换可能导致尺寸变化)
decoded_img = decoded_img[:img.shape[0], :img.shape[1]]
# 计算PSNR
mse = np.mean((img - decoded_img) ** 2)
psnr = 10 * np.log10((255 ** 2) / mse)
print(f"解码后PSNR:{psnr:.2f} dB")
# 可视化对比
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.imshow(img, cmap="gray")
plt.title("原始图像")
plt.axis("off")
plt.subplot(1, 2, 2)
plt.imshow(decoded_img, cmap="gray")
plt.title(f"小波解码后图像(压缩比:{compression_ratio:.2f},PSNR:{psnr:.2f}dB)")
plt.axis("off")
plt.tight_layout()
plt.show()
数字图像水印是在压缩 / 未压缩图像中嵌入不可见的标识信息,用于版权保护,核心是将水印信息嵌入到图像的低频分量(避免压缩丢失)。
import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy.fftpack import dct, idct
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 生成水印(二进制序列)
def generate_watermark(shape, seed=123):
np.random.seed(seed)
watermark = np.random.randint(0, 2, shape)
return watermark
# 嵌入DCT域水印
def embed_watermark(image, watermark, alpha=0.1):
h, w = image.shape
# 补零到8的倍数
h_pad = (8 - h % 8) % 8
w_pad = (8 - w % 8) % 8
img_padded = np.pad(image, ((0, h_pad), (0, w_pad)), mode='constant')
h_new, w_new = img_padded.shape
# 水印平铺到图像尺寸
watermark = np.tile(watermark, (h_new//watermark.shape[0]+1, w_new//watermark.shape[1]+1))[:h_new, :w_new]
# 分块嵌入水印
watermarked = np.zeros_like(img_padded, dtype=np.float32)
for i in range(0, h_new, 8):
for j in range(0, w_new, 8):
block = img_padded[i:i+8, j:j+8].astype(np.float32) - 128
dct_block = dct(dct(block.T, norm='ortho').T, norm='ortho')
# 嵌入水印到中频系数(避免视觉失真)
dct_block[4, 4] += alpha * (2 * watermark[i, j] - 1) * 255
idct_block = idct(idct(dct_block.T, norm='ortho').T, norm='ortho') + 128
watermarked[i:i+8, j:j+8] = idct_block
# 裁剪回原始尺寸并限制范围
watermarked = np.clip(watermarked[:h, :w], 0, 255).astype(np.uint8)
return watermarked
# 提取DCT域水印
def extract_watermark(image, watermark_shape, alpha=0.1):
h, w = image.shape
h_pad = (8 - h % 8) % 8
w_pad = (8 - w % 8) % 8
img_padded = np.pad(image, ((0, h_pad), (0, w_pad)), mode='constant')
h_new, w_new = img_padded.shape
# 提取水印
watermark = np.zeros((h_new, w_new), dtype=np.uint8)
for i in range(0, h_new, 8):
for j in range(0, w_new, 8):
block = img_padded[i:i+8, j:j+8].astype(np.float32) - 128
dct_block = dct(dct(block.T, norm='ortho').T, norm='ortho')
# 提取中频系数的水印信息
watermark_val = dct_block[4, 4]
watermark[i, j] = 1 if watermark_val > 0 else 0
# 裁剪到水印尺寸
watermark = watermark[:watermark_shape[0], :watermark_shape[1]]
return watermark
# 主函数
if __name__ == "__main__":
# 读取图像
img = cv2.imread("test_img.jpg", cv2.IMREAD_GRAYSCALE)
if img is None:
print("请确保当前目录下有test_img.jpg文件!")
exit()
# 生成水印(32x32)
watermark_shape = (32, 32)
watermark = generate_watermark(watermark_shape)
# 嵌入水印
watermarked_img = embed_watermark(img, watermark, alpha=0.1)
# 提取水印
extracted_watermark = extract_watermark(watermarked_img, watermark_shape, alpha=0.1)
# 可视化
plt.figure(figsize=(15, 10))
# 原图
plt.subplot(2, 2, 1)
plt.imshow(img, cmap="gray")
plt.title("原始图像")
plt.axis("off")
# 加水印图像
plt.subplot(2, 2, 2)
plt.imshow(watermarked_img, cmap="gray")
plt.title("嵌入水印后的图像")
plt.axis("off")
# 原始水印
plt.subplot(2, 2, 3)
plt.imshow(watermark, cmap="gray")
plt.title("原始水印")
plt.axis("off")
# 提取的水印
plt.subplot(2, 2, 4)
plt.imshow(extracted_watermark, cmap="gray")
plt.title("提取的水印")
plt.axis("off")
plt.tight_layout()
plt.show()

pip install opencv-python numpy matplotlib scipy pywaveletstest_img.jpg的测试图像;希望本文能帮助你理解图像压缩的核心知识,所有代码均可直接运行,建议动手调试参数(如 DCT 质量因子、小波阈值),直观感受不同参数对压缩效果的影响!