python-3.x - python中的图像压缩
问题描述
对于我的图像压缩,我使用枕头库来获取 rgb 中的每个像素(例如:(100、0、200)。使用霍夫曼编码我已经转换为二进制以减少位数。现在,我有将位序列保存到文本或二进制文件中。压缩文件始终小于原始文件,但现在,我的 txt 文件大于原始文件。我该怎么办?之后我该如何读取文件并解压。这里是一个指令:
您的代码应该读入图像文件,计算固定长度编码需要多少位,然后应用压缩算法来创建更小的编码——您需要实现压缩,不能使用压缩库。您应该输出以压缩格式存储图像所需的位数以及所达到的压缩率。在保存压缩图像时,您将无法将其保存为标准图像格式,因为您将创建自己的编码,但您可以将位序列保存到文本或二进制文件中。
您的代码还应该能够提示用户输入包含压缩位序列的文本文件的文件名,然后将该文件解压缩为原始图像 - 您可以假设该文件使用与您压缩的最后一个文件相同的压缩格式. 因此,例如,如果您将 pacificat.bmp 压缩为存储在 pacificat.txt 中的一系列位,然后用户要求您解压缩 alt_encode.txt,您可以假设 alt_pacificat.txt 使用与 encode.txt 相同的压缩数据结构(例如,它可能是来自原始图像的数据的子集)。
有许多库可以帮助您将格式化数据从 Python 存储到文件中。如果您研究选项并找到将压缩数据结构存储到文件中的方法,以便用户可以选择位文件和数据结构文件并使用数据结构解压缩位文件
只需使用我当前的图像:flag2.bmp
这是我的代码
from PIL import Image
import sys, string
import copy
import time
codes = {}
def sortFreq (freqs) :
letters = freqs.keys()
tuples = []
for let in letters :
tuples.append((freqs[let],let))
tuples.sort()
return tuples
def buildTree(tuples) :
while len(tuples) > 1 :
leastTwo = tuple(tuples[0:2]) # get the 2 to combine
theRest = tuples[2:] # all the others
combFreq = leastTwo[0][0] + leastTwo[1][0] # the branch points freq
tuples = theRest + [(combFreq,leastTwo)] # add branch point to the end
tuples.sort() # sort it into place
return tuples[0] # Return the single tree inside the list
def trimTree (tree) :
# Trim the freq counters off, leaving just the letters
p = tree[1] # ignore freq count in [0]
if type(p) == type("") : return p # if just a leaf, return it
else : return (trimTree(p[0]), trimTree(p[1])) # trim left then right and recombine
def assignCodes(node, pat=''):
global codes
if type(node) == type("") :
codes[node] = pat # A leaf. set its code
else : #
assignCodes(node[0], pat+"0") # Branch point. Do the left branch
assignCodes(node[1], pat+"1") # then do the right branch.
start = time.time()
dictionary = {}
table = {}
image = Image.open('flag2.bmp')
#image.show()
width, height = image.size
px= image.load()
totalpixel = width*height
print("Total pixel: "+ str(totalpixel))
for x in range(width):
for y in range(height):
# print(px[x, y])
for i in range(3):
if dictionary.get(str(px[x, y][i])) is None:
dictionary[str(px[x, y][i])] = 1
else:
dictionary[str(px[x, y][i])] = dictionary[str(px[x, y][i])] +1
table = copy.deepcopy(dictionary)
def encode2 (str) :
global codes
output = ""
for ch in str : output += codes[ch]
return output
def decode (tree, str) :
output = ""
p = tree
for bit in str :
if bit == '0' : p = p[0] # Head up the left branch
else : p = p[1] # or up the right branch
if type(p) == type("") :
output += p # found a character. Add to output
p = tree # and restart for next character
return output
combination = len(dictionary)
for value in table:
table[value] = table[value] / (totalpixel * combination) * 100
print(table)
print(dictionary)
sortdic = sortFreq(dictionary)
tree = buildTree(sortdic)
print("tree")
print(tree)
trim = trimTree(tree)
print("trim")
print(trim)
print("assign 01")
assignCodes(trim)
print(codes)
empty_tuple = ()
f = open("answer.txt","w")
for x in range(width):
for y in range(height):
list = []
list.append(codes[str(px[x, y][0])])
list.append(codes[str(px[x, y][1])])
list.append(codes[str(px[x, y][2])])
print(str(px[x, y]) + ": " +str(list))
f.write(str(list))
print("decode test:", str(decode (trim, "1100")))
stop = time.time()
times = (stop - start) * 1000
print("Run time takes %d miliseconds" % times)
[flag2.bmp][1]
解决方案
代码清理
让我们尝试稍微重构您的代码,利用 Python 标准库提供的算法,同时保持 Huffman 树计算和图像编码方法的精神。
计算符号计数
首先,我们可以将符号计数重构为一个函数,并以更简洁的方式重写:
- 用于
Image.getdata()
遍历图像中的所有像素 - 由于每个像素都由一个元组表示,因此可以
itertools.chain.from_iterable
用来获得强度的平面视图。 - 利用
collections.Counter
获取符号(强度计数)
此外,我们可以将其更改为返回(symbol, count)
按 升序排序的 列表(count, symbol)
。为此,我们可以将它与您的sortFreq(...)
函数的重写版本结合起来,利用:
- Python
sorted(...)
函数(它允许我们定义排序依据),以及 - 元组切片以反转
(symbol, count)
元组以进行排序
执行:
from collections import Counter
from itertools import chain
def count_symbols(image):
pixels = image.getdata()
values = chain.from_iterable(pixels)
counts = Counter(values).items()
return sorted(counts, key=lambda x:x[::-1])
构建树
这里只需要做一个小的改变——因为我们已经对符号计数进行了排序,我们只需要反转元组来让你现有的树构建算法工作。我们可以使用列表推导和元组切片来简洁地表达这一点。
执行:
def build_tree(counts) :
nodes = [entry[::-1] for entry in counts] # Reverse each (symbol,count) tuple
while len(nodes) > 1 :
leastTwo = tuple(nodes[0:2]) # get the 2 to combine
theRest = nodes[2:] # all the others
combFreq = leastTwo[0][0] + leastTwo[1][0] # the branch points freq
nodes = theRest + [(combFreq, leastTwo)] # add branch point to the end
nodes.sort() # sort it into place
return nodes[0] # Return the single tree inside the list
修剪树
同样,与您的原始实现相比,只有两个小改动:
- 更改测试以检查
tuple
(节点),以独立于符号的表示方式。 - 摆脱不必要的
else
执行:
def trim_tree(tree) :
p = tree[1] # Ignore freq count in [0]
if type(p) is tuple: # Node, trim left then right and recombine
return (trim_tree(p[0]), trim_tree(p[1]))
return p # Leaf, just return it
分配代码
这里最重要的变化是消除了对全局codes
变量的依赖。为了解决这个问题,我们可以将实现拆分为两个函数,一个处理递归代码分配,一个包装器创建一个新的本地codes
字典,在其上调度递归函数,并返回输出。
让我们也将代码的表示形式从字符串转换为位列表(范围内的整数[0,1]
) - 稍后将清楚其用处。
再一次,我们将更改测试以检查tuple
s(出于与修剪时相同的原因)。
执行:
def assign_codes_impl(codes, node, pat):
if type(node) == tuple:
assign_codes_impl(codes, node[0], pat + [0]) # Branch point. Do the left branch
assign_codes_impl(codes, node[1], pat + [1]) # then do the right branch.
else:
codes[node] = pat # A leaf. set its code
def assign_codes(tree):
codes = {}
assign_codes_impl(codes, tree, [])
return codes
编码
让我们绕个小弯,谈谈数据的编码。
首先,让我们观察原始 RGB 像素由 3 个字节表示(每个颜色通道一个字节。即每像素 24 位,形成我们的基线。
现在,您当前的算法将第一个像素编码为以下 ASCII 字符串:
['000', '0010', '0011']
总共 23 个字节(或 184 位)。这比生的要糟糕得多。让我们来看看为什么:
- 有两个空格,只是让人们更容易阅读。那些没有任何信息。(2 个字节)
- 三个代码中的每一个都由两个撇号分隔。由于代码仅由
0
s 和1
s 组成,因此解析不需要撇号,因此也不携带任何信息。(6 个字节) - 每个代码都是前缀代码,因此可以明确解析,因此用于代码分隔的两个逗号也是不必要的。(2 个字节)
- 我们知道每个像素有三个代码,因此我们也不需要大括号 (
[
,]
) 来分隔像素(出于与上述相同的原因)。(2 个字节)
总的来说,每个像素有 12 个字节,根本不携带任何信息。剩下的 11 个字节(在这种特殊情况下)确实携带了一些信息……但是有多少呢?
请注意,输出字母表中仅有的两个可能符号是0
和1
。这意味着每个符号携带 1 位信息。由于您将每个符号存储为 ASCII 字符(一个字节),因此每 1 位信息使用 8 位。
综上所述,在这种特殊情况下,您使用 184 位来表示 11 位信息——比必要的多约 16.7 倍,比仅以原始格式存储像素差约 7.67 倍。
显然,使用编码数据的简单文本表示不会产生任何压缩。我们将需要更好的方法。
比特流
从我们之前的分析中可以看出,为了有效地执行压缩(和解压缩),我们需要能够将输出(或输入)视为单个比特流。标准 Python 库没有提供直接的解决方案来做到这一点——在最低粒度下,我们一次只能读取或写入一个字节的文件。
由于我们要对可能包含多个位的值进行编码,因此必须根据重要性对它们进行排序进行解码。让我们从最重要到最不重要对它们进行排序。
位 I/O 实用程序
如前所述,我们将位序列表示为范围内的整数列表[0,1]
。让我们从编写一些简单的实用函数开始:
- 将整数转换为唯一表示它的最短位序列的函数(即至少 1 位,否则没有前导零)。
- 将位序列转换为整数的函数。
- 对位序列(以允许固定长度编码)进行零扩展(将零添加到最高有效位置)的函数。
执行:
def to_binary_list(n):
"""Convert integer into a list of bits"""
return [n] if (n <= 1) else to_binary_list(n >> 1) + [n & 1]
def from_binary_list(bits):
"""Convert list of bits into an integer"""
result = 0
for bit in bits:
result = (result << 1) | bit
return result
def pad_bits(bits, n):
"""Prefix list of bits with enough zeros to reach n digits"""
assert(n >= len(bits))
return ([0] * (n - len(bits)) + bits)
示例用法:
>>> to_binary_list(14)
[1, 1, 1, 0]
>>> from_binary_list([1,1,1,0])
14
>>> pad_bits(to_binary_list(14),8)
[0, 0, 0, 0, 1, 1, 1, 0]
输出比特流
由于文件 I/O API 只允许我们保存整个字节,因此我们需要创建一个包装类来缓冲写入内存流中的位。
让我们提供写入单个位以及位序列的方法。
每个写入命令(1 位或更多位)将首先将位添加到缓冲区中。一旦缓冲区包含超过 8 位,则从前面删除 8 位组,转换为 [0-255] 范围内的整数并保存到输出文件。这样做直到缓冲区包含少于 8 位。
最后,让我们提供一种“刷新”流的方法——当缓冲区非空,但不包含足够的位来组成一个完整字节时,将零添加到最低有效位置直到有 8 位,然后写入字节。当我们关闭比特流时我们需要这个(我们稍后会看到其他一些好处)。
执行:
class OutputBitStream(object):
def __init__(self, file_name):
self.file_name = file_name
self.file = open(self.file_name, 'wb')
self.bytes_written = 0
self.buffer = []
def write_bit(self, value):
self.write_bits([value])
def write_bits(self, values):
self.buffer += values
while len(self.buffer) >= 8:
self._save_byte()
def flush(self):
if len(self.buffer) > 0: # Add trailing zeros to complete a byte and write it
self.buffer += [0] * (8 - len(self.buffer))
self._save_byte()
assert(len(self.buffer) == 0)
def _save_byte(self):
bits = self.buffer[:8]
self.buffer[:] = self.buffer[8:]
byte_value = from_binary_list(bits)
self.file.write(bytes([byte_value]))
self.bytes_written += 1
def close(self):
self.flush()
self.file.close()
输入比特流
输入比特流遵循类似的主题。我们希望一次读取 1 位或更多位。为此,我们从文件中加载字节,将每个字节转换为位列表并将其添加到缓冲区中,直到有足够的字节来满足读取请求。
在这种情况下,flush 命令会清除缓冲区(确保它只包含零)。
执行:
class InputBitStream(object):
def __init__(self, file_name):
self.file_name = file_name
self.file = open(self.file_name, 'rb')
self.bytes_read = 0
self.buffer = []
def read_bit(self):
return self.read_bits(1)[0]
def read_bits(self, count):
while len(self.buffer) < count:
self._load_byte()
result = self.buffer[:count]
self.buffer[:] = self.buffer[count:]
return result
def flush(self):
assert(not any(self.buffer))
self.buffer[:] = []
def _load_byte(self):
value = ord(self.file.read(1))
self.buffer += pad_bits(to_binary_list(value), 8)
self.bytes_read += 1
def close(self):
self.file.close()
压缩格式
接下来我们需要定义压缩比特流的格式。解码图像需要三个基本信息块:
- 图像的形状(高度和宽度),假设它是 3 通道 RGB 图像。
- 在解码端重建霍夫曼码所需的信息
- 霍夫曼编码的像素数据
让我们的压缩格式如下:
- 标题
- 图像高度(16 位,无符号)
- 图像宽度(16 位,无符号)
- 霍夫曼表(开始与整个字节对齐)
- 有关算法,请参见this。
- 像素代码(开始与整个字节对齐)
width * height * 3
霍夫曼码序列
压缩
执行:
from PIL import Image
def compressed_size(counts, codes):
header_size = 2 * 16 # height and width as 16 bit values
tree_size = len(counts) * (1 + 8) # Leafs: 1 bit flag, 8 bit symbol each
tree_size += len(counts) - 1 # Nodes: 1 bit flag each
if tree_size % 8 > 0: # Padding to next full byte
tree_size += 8 - (tree_size % 8)
# Sum for each symbol of count * code length
pixels_size = sum([count * len(codes[symbol]) for symbol, count in counts])
if pixels_size % 8 > 0: # Padding to next full byte
pixels_size += 8 - (pixels_size % 8)
return (header_size + tree_size + pixels_size) / 8
def encode_header(image, bitstream):
height_bits = pad_bits(to_binary_list(image.height), 16)
bitstream.write_bits(height_bits)
width_bits = pad_bits(to_binary_list(image.width), 16)
bitstream.write_bits(width_bits)
def encode_tree(tree, bitstream):
if type(tree) == tuple: # Note - write 0 and encode children
bitstream.write_bit(0)
encode_tree(tree[0], bitstream)
encode_tree(tree[1], bitstream)
else: # Leaf - write 1, followed by 8 bit symbol
bitstream.write_bit(1)
symbol_bits = pad_bits(to_binary_list(tree), 8)
bitstream.write_bits(symbol_bits)
def encode_pixels(image, codes, bitstream):
for pixel in image.getdata():
for value in pixel:
bitstream.write_bits(codes[value])
def compress_image(in_file_name, out_file_name):
print('Compressing "%s" -> "%s"' % (in_file_name, out_file_name))
image = Image.open(in_file_name)
print('Image shape: (height=%d, width=%d)' % (image.height, image.width))
size_raw = raw_size(image.height, image.width)
print('RAW image size: %d bytes' % size_raw)
counts = count_symbols(image)
print('Counts: %s' % counts)
tree = build_tree(counts)
print('Tree: %s' % str(tree))
trimmed_tree = trim_tree(tree)
print('Trimmed tree: %s' % str(trimmed_tree))
codes = assign_codes(trimmed_tree)
print('Codes: %s' % codes)
size_estimate = compressed_size(counts, codes)
print('Estimated size: %d bytes' % size_estimate)
print('Writing...')
stream = OutputBitStream(out_file_name)
print('* Header offset: %d' % stream.bytes_written)
encode_header(image, stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Tree offset: %d' % stream.bytes_written)
encode_tree(trimmed_tree, stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Pixel offset: %d' % stream.bytes_written)
encode_pixels(image, codes, stream)
stream.close()
size_real = stream.bytes_written
print('Wrote %d bytes.' % size_real)
print('Estimate is %scorrect.' % ('' if size_estimate == size_real else 'in'))
print('Compression ratio: %0.2f' % (float(size_raw) / size_real))
减压
执行:
from PIL import Image
def decode_header(bitstream):
height = from_binary_list(bitstream.read_bits(16))
width = from_binary_list(bitstream.read_bits(16))
return (height, width)
# https://stackoverflow.com/a/759766/3962537
def decode_tree(bitstream):
flag = bitstream.read_bits(1)[0]
if flag == 1: # Leaf, read and return symbol
return from_binary_list(bitstream.read_bits(8))
left = decode_tree(bitstream)
right = decode_tree(bitstream)
return (left, right)
def decode_value(tree, bitstream):
bit = bitstream.read_bits(1)[0]
node = tree[bit]
if type(node) == tuple:
return decode_value(node, bitstream)
return node
def decode_pixels(height, width, tree, bitstream):
pixels = bytearray()
for i in range(height * width * 3):
pixels.append(decode_value(tree, bitstream))
return Image.frombytes('RGB', (width, height), bytes(pixels))
def decompress_image(in_file_name, out_file_name):
print('Decompressing "%s" -> "%s"' % (in_file_name, out_file_name))
print('Reading...')
stream = InputBitStream(in_file_name)
print('* Header offset: %d' % stream.bytes_read)
height, width = decode_header(stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Tree offset: %d' % stream.bytes_read)
trimmed_tree = decode_tree(stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Pixel offset: %d' % stream.bytes_read)
image = decode_pixels(height, width, trimmed_tree, stream)
stream.close()
print('Read %d bytes.' % stream.bytes_read)
print('Image size: (height=%d, width=%d)' % (height, width))
print('Trimmed tree: %s' % str(trimmed_tree))
image.save(out_file_name)
测试运行
from PIL import ImageChops
def raw_size(width, height):
header_size = 2 * 16 # height and width as 16 bit values
pixels_size = 3 * 8 * width * height # 3 channels, 8 bits per channel
return (header_size + pixels_size) / 8
def images_equal(file_name_a, file_name_b):
image_a = Image.open(file_name_a)
image_b = Image.open(file_name_b)
diff = ImageChops.difference(image_a, image_b)
return diff.getbbox() is None
if __name__ == '__main__':
start = time.time()
compress_image('flag.png', 'answer.txt')
print('-' * 40)
decompress_image('answer.txt', 'flag_out.png')
stop = time.time()
times = (stop - start) * 1000
print('-' * 40)
print('Run time takes %d miliseconds' % times)
print('Images equal = %s' % images_equal('flag.png', 'flag_out.png'))
我使用您提供的示例图像运行了脚本。
控制台输出:
Compressing "flag.png" -> "answer.txt"
Image shape: (height=18, width=23)
RAW image size: 1246 bytes
Counts: [(24, 90), (131, 90), (215, 90), (59, 324), (60, 324), (110, 324)]
Tree: (1242, ((594, ((270, ((90, 215), (180, ((90, 24), (90, 131))))), (324, 59))), (648, ((324, 60), (324, 110)))))
Trimmed tree: (((215, (24, 131)), 59), (60, 110))
Codes: {215: [0, 0, 0], 24: [0, 0, 1, 0], 131: [0, 0, 1, 1], 59: [0, 1], 60: [1, 0], 110: [1, 1]}
Estimated size: 379 bytes
Writing...
* Header offset: 0
* Tree offset: 4
* Pixel offset: 12
Wrote 379 bytes.
Estimate is correct.
Compression ratio: 3.29
----------------------------------------
Decompressing "answer.txt" -> "flag_out.png"
Reading...
* Header offset: 0
* Tree offset: 4
* Pixel offset: 12
Read 379 bytes.
Image size: (height=18, width=23)
Trimmed tree: (((215, (24, 131)), 59), (60, 110))
----------------------------------------
Run time takes 32 miliseconds
Images equal = True
潜在的改进
- 每个颜色通道的霍夫曼表
- 调色板图像支持
- 转换滤波器(每个通道的增量编码,或更复杂的预测器)
- 处理重复的模型(RLE、LZ...)
- 规范霍夫曼表
推荐阅读
- python - while循环的文本表达式在python中不起作用
- javascript - 如何在对象上设置 useState 数据
- javascript - 当它们达到 60 时,我怎样才能将秒数重置为 0
- css - html body 元素的最小高度
- embedded-linux - 无法升级 Yocto Krogoth 中元层的配方
- r - 我已经编写了一个函数,并试图通过 lapply 将其传递给数据框中的每一行
- active-directory - 有没有办法在 DistinguishedName 中排序
- c# - asp.net 核心通过 ENV 变量覆盖连接字符串
- automation - 使用 Protractor 并行浏览器测试的最佳实践是什么?
- excel - VBA如何在用户表单中过滤和排序范围