jay's blog
图片压缩
使用python调用optipng和jpegoptim来压缩图片

linux上使用optipng和jpegoptim压缩,基于python实现。需要先安装optipng和jpegoptim。代码如下

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
Created on 7/22/15

@author: tianjie
"""
import os, Queue, thread, multiprocessing, threading, time, sys
from os import path
from imghdr import what as img_type


class AtomicBoolean:
    """
    原子bool
    """
    lock = threading.Condition()
    flag = False

    def __init__(self, _flag):
        self.flag = _flag

    def compare_and_set(self, _flag):
        self.lock.acquire()
        try:
            if self.flag != _flag:
                self.flag = _flag
        finally:
            self.lock.release()

    def get(self):
        self.lock.acquire()
        try:
            return self.flag
        finally:
            self.lock.release()


# 控制工作线程退出
stop_flag = AtomicBoolean(False)
# 待优化图片队列
queue = Queue.Queue(100000)
# cpu核心数
cpu_size = multiprocessing.cpu_count()
# 控制主线程退出信号
sem = threading.Semaphore(0)


def search_img(_abspath):
    """
    https://docs.python.org/2/library/imghdr.html
    """
    if not path.exists(_abspath):
        print u"指定的路径%s不存在" % _abspath
        return

    if path.isfile(_abspath):
        if img_type(_abspath) == 'png' \
                or img_type(_abspath) == 'jpeg':
            queue.put(_abspath)
    else:
        sub_files = os.listdir(_abspath)
        for _file in sub_files:
            _path = path.join(_abspath, _file)
            search_img(_path)


def optimize_img():
    """
    处理文件。直到关闭标记和队列均为空才退出
    :return:
    """
    while not (queue.empty() and stop_flag.get()):
        _file = ""
        try:
            if queue.empty():
                time.sleep(1)
                continue

            _file = queue.get_nowait()
            if img_type(_file) == 'jpeg':
                cmd = "%s \"%s\"" % ("jpegoptim", _file)
            elif img_type(_file) == 'png':
                cmd = "%s \"%s\"" % ("optipng", _file)
            else:
                continue
            os.system(cmd)
            # print(u"%s处理完成" % _file)
        except:
            print(u"%s处理失败" % _file)
    sem.release()


def init_threads():
    for idx in range(0, cpu_size):
        thread.start_new_thread(optimize_img, ())


def main(_abspath):
    """
    扫描图片由main函数完成,处理由工作线程完成
    主线程申请cpu_size数个被阻塞,直到工作线程一一释放信号。注意:信号中默认存放0个.
    :param _abspath:
    :return:
    """
    init_threads()
    search_img(_abspath)
    stop_flag.compare_and_set(True)
    sem.acquire(cpu_size)


if __name__ == '__main__':
    if len(sys.argv) < 2:
        print("usage:python optimizeImg.py /path/to/your/image/folder")
    else:
        main(sys.argv[1])

最后修改于 2015-07-23