HEX
Server: Apache
System: Linux srv1.prosuiteplus.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: prosuiteplus (1001)
PHP: 8.3.20
Disabled: NONE
Upload Files
File: //lib/python3/dist-packages/pikepdf/models/image.py
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Copyright (C) 2017, James R. Barlow (https://github.com/jbarlow83/)

import struct
from abc import ABC, abstractmethod
from decimal import Decimal
from io import BytesIO
from itertools import zip_longest
from pathlib import Path
from shutil import copyfileobj
from tempfile import NamedTemporaryFile
from zlib import decompress
from zlib import error as ZlibError

from .. import Array, Dictionary, Name, Object, PdfError, Stream


class DependencyError(Exception):
    pass


class UnsupportedImageTypeError(Exception):
    pass


class NotExtractableError(Exception):
    pass


def array_str(value):
    if isinstance(value, (list, Array)):
        return [str(item) for item in value]
    if isinstance(value, Name):
        return [str(value)]
    raise NotImplementedError(value)


def array_str_colorspace(value):
    if isinstance(value, (list, Array)):
        items = [item for item in value]
        if len(items) == 4 and items[0] == '/Indexed':
            result = [str(items[n]) for n in range(3)]
            result.append(bytes(items[3]))
            return result
        if len(items) == 2 and items[0] == '/ICCBased':
            result = [str(items[0]), items[1]]
            return result
        return array_str(items)

    return array_str(value)


def dict_or_array_dict(value):
    if isinstance(value, list):
        return value
    if isinstance(value, Dictionary):
        return [value.as_dict()]
    if isinstance(value, Array):
        return [v.as_list() for v in value]
    raise NotImplementedError(value)


def metadata_from_obj(obj, name, type_, default):
    val = getattr(obj, name, default)
    try:
        return type_(val)
    except TypeError:
        if val is None:
            return None
    raise NotImplementedError('Metadata access for ' + name)


class PdfImageBase(ABC):

    SIMPLE_COLORSPACES = ('/DeviceRGB', '/DeviceGray', '/CalRGB', '/CalGray')

    @abstractmethod
    def _metadata(self, name, type_, default):
        pass

    @property
    def width(self):
        """Width of the image data in pixels"""
        return self._metadata('Width', int, None)

    @property
    def height(self):
        """Height of the image data in pixels"""
        return self._metadata('Height', int, None)

    @property
    def image_mask(self):
        """``True`` if this is an image mask"""
        return self._metadata('ImageMask', bool, False)

    @property
    def _bpc(self):
        """Bits per component for this image (low-level)"""
        return self._metadata('BitsPerComponent', int, None)

    @property
    def _colorspaces(self):
        """Colorspace (low-level)"""
        return self._metadata('ColorSpace', array_str_colorspace, [])

    @property
    def filters(self):
        """List of names of the filters that we applied to encode this image"""
        return self._metadata('Filter', array_str, [])

    @property
    def decode_parms(self):
        """List of the /DecodeParms, arguments to filters"""
        return self._metadata('DecodeParms', dict_or_array_dict, [])

    @property
    def colorspace(self):
        """PDF name of the colorspace that best describes this image"""
        if self.image_mask:
            return None  # Undefined for image masks
        if self._colorspaces:
            if self._colorspaces[0] in self.SIMPLE_COLORSPACES:
                return self._colorspaces[0]
            if self._colorspaces[0] in ('/DeviceCMYK', '/ICCBased'):
                return self._colorspaces[0]
            if (
                self._colorspaces[0] == '/Indexed'
                and self._colorspaces[1] in self.SIMPLE_COLORSPACES
            ):
                return self._colorspaces[1]
        raise NotImplementedError(
            "not sure how to get colorspace: " + repr(self._colorspaces)
        )

    @property
    def bits_per_component(self):
        """Bits per component of this image"""
        if self._bpc is None:
            return 1 if self.image_mask else 8
        return self._bpc

    @property
    @abstractmethod
    def is_inline(self):
        pass

    @property
    @abstractmethod
    def icc(self):
        pass

    @property
    def indexed(self):
        """``True`` if the image has a defined color palette"""
        return '/Indexed' in self._colorspaces

    @property
    def size(self):
        """Size of image as (width, height)"""
        return self.width, self.height

    @property
    def mode(self):
        """``PIL.Image.mode`` equivalent for this image, where possible

        If an ICC profile is attached to the image, we still attempt to resolve a Pillow
        mode.
        """

        m = ''
        if self.indexed:
            m = 'P'
        elif self.bits_per_component == 1:
            m = '1'
        elif self.bits_per_component == 8:
            if self.colorspace == '/DeviceRGB':
                m = 'RGB'
            elif self.colorspace == '/DeviceGray':
                m = 'L'
            elif self.colorspace == '/DeviceCMYK':
                m = 'CMYK'
            elif self.colorspace == '/ICCBased':
                try:
                    icc_profile = self._colorspaces[1]
                    icc_profile_nchannels = int(icc_profile['/N'])
                    if icc_profile_nchannels == 1:
                        m = 'L'
                    elif icc_profile_nchannels == 3:
                        m = 'RGB'
                    elif icc_profile_nchannels == 4:
                        m = 'CMYK'
                except (ValueError, TypeError):
                    pass
        if m == '':
            raise NotImplementedError("Not sure how to handle PDF image of this type")
        return m

    @property
    def filter_decodeparms(self):
        """PDF has a lot of optional data structures concerning /Filter and
        /DecodeParms. /Filter can be absent or a name or an array, /DecodeParms
        can be absent or a dictionary (if /Filter is a name) or an array (if
        /Filter is an array). When both are arrays the lengths match.

        Normalize this into:
        [(/FilterName, {/DecodeParmName: Value, ...}), ...]

        The order of /Filter matters as indicates the encoding/decoding sequence.
        """
        return list(zip_longest(self.filters, self.decode_parms, fillvalue={}))

    @property
    def palette(self):
        """Retrieves the color palette for this image

        Returns:
            tuple (base_colorspace: str, palette: bytes)
        """

        if not self.indexed:
            return None
        _idx, base, hival, lookup = None, None, None, None
        try:
            _idx, base, hival, lookup = self._colorspaces
        except ValueError as e:
            raise ValueError('Not sure how to interpret this palette') from e
        base = str(base)
        hival = int(hival)
        lookup = bytes(lookup)
        if not base in self.SIMPLE_COLORSPACES:
            raise NotImplementedError("not sure how to interpret this palette")
        if base == '/DeviceRGB':
            base = 'RGB'
        elif base == '/DeviceGray':
            base = 'L'
        return base, lookup

    @abstractmethod
    def as_pil_image(self):
        pass

    @staticmethod
    def _unstack_compression(buffer, filters):
        """Remove stacked compression where it appears.

        Stacked compression means when an image is set to:
            ``[/FlateDecode /DCTDecode]``
        for example.

        Only Flate can be stripped off the front currently.

        Args:
            buffer (pikepdf._qpdf.Buffer): the compressed image data
            filters (list of str): all files on the data
        """
        data = memoryview(buffer)
        while len(filters) > 1 and filters[0] == '/FlateDecode':
            try:
                data = decompress(data)
            except ZlibError as e:
                raise UnsupportedImageTypeError() from e
            filters = filters[1:]
        return data, filters


class PdfImage(PdfImageBase):
    """Support class to provide a consistent API for manipulating PDF images

    The data structure for images inside PDFs is irregular and flexible,
    making it difficult to work with without introducing errors for less
    typical cases. This class addresses these difficulties by providing a
    regular, Pythonic API similar in spirit (and convertible to) the Python
    Pillow imaging library.
    """

    def __new__(cls, obj):
        instance = super().__new__(cls)
        instance.__init__(obj)
        if '/JPXDecode' in instance.filters:
            instance = super().__new__(PdfJpxImage)
            instance.__init__(obj)
        return instance

    def __init__(self, obj):
        """Construct a PDF image from a Image XObject inside a PDF

        ``pim = PdfImage(page.Resources.XObject['/ImageNN'])``

        Args:
            obj (pikepdf.Object): an Image XObject

        """
        if isinstance(obj, Stream) and obj.stream_dict.get("/Subtype") != "/Image":
            raise TypeError("can't construct PdfImage from non-image")
        self.obj = obj
        self._icc = None

    @classmethod
    def _from_pil_image(cls, *, pdf, page, name, image):  # pragma: no cover
        """Insert a PIL image into a PDF (rudimentary)

        Args:
            pdf (pikepdf.Pdf): the PDF to attach the image to
            page (pikepdf.Object): the page to attach the image to
            name (str or pikepdf.Name): the name to set the image
            image (PIL.Image.Image): the image to insert
        """

        data = image.tobytes()

        imstream = Stream(pdf, data)
        imstream.Type = Name('/XObject')
        imstream.Subtype = Name('/Image')
        if image.mode == 'RGB':
            imstream.ColorSpace = Name('/DeviceRGB')
        elif image.mode in ('1', 'L'):
            imstream.ColorSpace = Name('/DeviceGray')
        imstream.BitsPerComponent = 1 if image.mode == '1' else 8
        imstream.Width = image.width
        imstream.Height = image.height

        page.Resources.XObject[name] = imstream

        return cls(imstream)

    def _metadata(self, name, type_, default):
        return metadata_from_obj(self.obj, name, type_, default)

    @property
    def is_inline(self):
        """``False`` for image XObject"""
        return False

    @property
    def icc(self):
        """If an ICC profile is attached, return a Pillow object that describe it.

        Most of the information may be found in ``icc.profile``.

        Returns:
            PIL.ImageCms.ImageCmsProfile
        """
        from PIL import ImageCms

        if self.colorspace != '/ICCBased':
            return None
        if not self._icc:
            iccstream = self._colorspaces[1]
            iccbuffer = iccstream.get_stream_buffer()
            iccbytesio = BytesIO(iccbuffer)
            self._icc = ImageCms.ImageCmsProfile(iccbytesio)
        return self._icc

    def _extract_direct(self, *, stream):
        """Attempt to extract the image directly to a usable image file

        If there is no way to extract the image without decompressing or
        transcoding then raise an exception. The type and format of image
        generated will vary.

        Args:
            stream: Writable stream to write data to
        """

        def normal_dct_rgb():
            # Normal DCTDecode RGB images have the default value of
            # /ColorTransform 1 and are actually in YUV. Such a file can be
            # saved as a standard JPEG. RGB JPEGs without YUV conversion can't
            # be saved as JPEGs, and are probably bugs. Some software in the
            # wild actually produces RGB JPEGs in PDFs (probably a bug).
            DEFAULT_CT_RGB = 1
            ct = self.filter_decodeparms[0][1].get('/ColorTransform', DEFAULT_CT_RGB)
            return self.mode == 'RGB' and ct == DEFAULT_CT_RGB

        def normal_dct_cmyk():
            # Normal DCTDecode CMYKs have /ColorTransform 0 and can be saved.
            # There is a YUVK colorspace but CMYK JPEGs don't generally use it
            DEFAULT_CT_CMYK = 0
            ct = self.filter_decodeparms[0][1].get('/ColorTransform', DEFAULT_CT_CMYK)
            return self.mode == 'CMYK' and ct == DEFAULT_CT_CMYK

        data, filters = self._unstack_compression(
            self.obj.get_raw_stream_buffer(), self.filters
        )

        if filters == ['/CCITTFaxDecode']:
            if self.colorspace == '/ICCBased':
                raise UnsupportedImageTypeError("Cannot direct-extract CCITT + ICC")
            stream.write(self._generate_ccitt_header(data))
            stream.write(data)
            return '.tif'
        elif filters == ['/DCTDecode'] and (
            self.mode == 'L' or normal_dct_rgb() or normal_dct_cmyk()
        ):
            stream.write(data)
            return '.jpg'

        raise NotExtractableError()

    def _extract_transcoded(self):
        from PIL import Image

        im = None
        if self.mode == 'RGB' and self.bits_per_component == 8:
            # No point in accessing the buffer here, size qpdf decodes to 3-byte
            # RGB and Pillow needs RGBX for raw access
            data = self.read_bytes()
            im = Image.frombytes('RGB', self.size, data)
        elif self.mode in ('L', 'P') and self.bits_per_component == 8:
            buffer = self.get_stream_buffer()
            stride = 0  # tell Pillow to calculate stride from line width
            ystep = 1  # image is top to bottom in memory
            im = Image.frombuffer('L', self.size, buffer, "raw", 'L', stride, ystep)
            if self.mode == 'P':
                base_mode, palette = self.palette
                if base_mode in ('RGB', 'L'):
                    im.putpalette(palette, rawmode=base_mode)
                else:
                    raise NotImplementedError('palette with ' + base_mode)
        elif self.mode == '1' and self.bits_per_component == 1:
            data = self.read_bytes()
            im = Image.frombytes('1', self.size, data)

        elif self.mode == 'P' and self.bits_per_component == 1:
            data = self.read_bytes()
            im = Image.frombytes('1', self.size, data)

            base_mode, palette = self.palette
            if base_mode == 'RGB' and palette != b'\x00\x00\x00\xff\xff\xff':
                im = im.convert('P')
                im.putpalette(palette, rawmode=base_mode)
                gp = im.getpalette()
                gp[765:768] = gp[3:6]  # work around Pillow bug
                im.putpalette(gp)
            elif base_mode == 'L' and palette != b'\x00\xff':
                im = im.convert('P')
                im.putpalette(palette, rawmode=base_mode)
                gp = im.getpalette()
                gp[255] = gp[1]  # work around Pillow bug
                im.putpalette(gp)

        if self.colorspace == '/ICCBased':
            im.info['icc_profile'] = self.icc.tobytes()

        return im

    def _extract_to_stream(self, *, stream):
        """Attempt to extract the image directly to a usable image file

        If possible, the compressed data is extracted and inserted into
        a compressed image file format without transcoding the compressed
        content. If this is not possible, the data will be decompressed
        and extracted to an appropriate format.

        Because it is not known until attempted what image format will be
        extracted, users should not assume what format they are getting back.
        When saving the image to a file, use a temporary filename, and then
        rename the file to its final name based on the returned file extension.

        Args:
            stream: Writable stream to write data to

        Returns:
            str: The file format extension
        """

        try:
            return self._extract_direct(stream=stream)
        except NotExtractableError:
            pass

        try:
            im = self._extract_transcoded()
            if im:
                im.save(stream, format='png')
                return '.png'
        except PdfError as e:
            if 'getStreamData called on unfilterable stream' in str(e):
                raise UnsupportedImageTypeError(repr(self)) from e
            raise

        raise UnsupportedImageTypeError(repr(self))

    def extract_to(self, *, stream=None, fileprefix=''):
        """Attempt to extract the image directly to a usable image file

        If possible, the compressed data is extracted and inserted into
        a compressed image file format without transcoding the compressed
        content. If this is not possible, the data will be decompressed
        and extracted to an appropriate format.

        Because it is not known until attempted what image format will be
        extracted, users should not assume what format they are getting back.
        When saving the image to a file, use a temporary filename, and then
        rename the file to its final name based on the returned file extension.

        Examples:

            >>> im.extract_to(stream=bytes_io)
            '.png'

            >>> im.extract_to(fileprefix='/tmp/image00')
            '/tmp/image00.jpg'

        Args:
            stream: Writable stream to write data to.
            fileprefix (str or Path): The path to write the extracted image to,
                without the file extension.

        Returns:
            If *fileprefix* was provided, then the fileprefix with the
            appropriate extension. If no *fileprefix*, then an extension
            indicating the file type.

        Return type:
            str
        """

        if bool(stream) == bool(fileprefix):
            raise ValueError("Cannot set both stream and fileprefix")
        if stream:
            return self._extract_to_stream(stream=stream)

        bio = BytesIO()
        extension = self._extract_to_stream(stream=bio)
        bio.seek(0)
        filepath = Path(str(Path(fileprefix)) + extension)
        with filepath.open('wb') as target:
            copyfileobj(bio, target)
        return str(filepath)

    def read_bytes(self):
        """Decompress this image and return it as unencoded bytes"""
        return self.obj.read_bytes()

    def get_stream_buffer(self):
        """Access this image with the buffer protocol"""
        return self.obj.get_stream_buffer()

    def as_pil_image(self):
        """Extract the image as a Pillow Image, using decompression as necessary

        Returns:
            PIL.Image.Image
        """
        from PIL import Image

        try:
            bio = BytesIO()
            self._extract_direct(stream=bio)
            bio.seek(0)
            return Image.open(bio)
        except NotExtractableError:
            pass

        im = self._extract_transcoded()
        if not im:
            raise UnsupportedImageTypeError(repr(self))

        return im

    def _generate_ccitt_header(self, data):
        """Construct a CCITT G3 or G4 header from the PDF metadata"""
        # https://stackoverflow.com/questions/2641770/
        # https://www.itu.int/itudoc/itu-t/com16/tiff-fx/docs/tiff6.pdf

        if not self.decode_parms:
            raise ValueError("/CCITTFaxDecode without /DecodeParms")

        if self.decode_parms[0].get("/EncodedByteAlign", False):
            raise UnsupportedImageTypeError(
                "/CCITTFaxDecode with /EncodedByteAlign true"
            )

        k = self.decode_parms[0].get("/K", 0)
        if k < 0:
            ccitt_group = 4  # Pure two-dimensional encoding (Group 4)
        elif k > 0:
            ccitt_group = 3  # Group 3 2-D
        else:
            ccitt_group = 2  # CCITT 1-D
        black_is_one = self.decode_parms[0].get("/BlackIs1", False)
        # TIFF spec says:
        # use 0 for white_is_zero (=> black is 1)
        # use 1 for black_is_zero (=> white is 1)
        # In practice, do the opposite of what the TIFF spec says.
        photometry = 1 if black_is_one else 0

        img_size = len(data)
        tiff_header_struct = '<' + '2s' + 'H' + 'L' + 'H' + 'HHLL' * 8 + 'L'
        # fmt: off
        tiff_header = struct.pack(
            tiff_header_struct,
            b'II',  # Byte order indication: Little endian
            42,  # Version number (always 42)
            8,  # Offset to first IFD
            8,  # Number of tags in IFD
            256, 4, 1, self.width,  # ImageWidth, LONG, 1, width
            257, 4, 1, self.height,  # ImageLength, LONG, 1, length
            258, 3, 1, 1,  # BitsPerSample, SHORT, 1, 1
            259, 3, 1, ccitt_group,  # Compression, SHORT, 1, 4 = CCITT Group 4 fax encoding
            262, 3, 1, int(photometry),  # Thresholding, SHORT, 1, 0 = WhiteIsZero
            273, 4, 1, struct.calcsize(tiff_header_struct),  # StripOffsets, LONG, 1, length of header
            278, 4, 1, self.height,
            279, 4, 1, img_size,  # StripByteCounts, LONG, 1, size of image
            0  # last IFD
        )
        # fmt: on
        return tiff_header

    def show(self):
        """Show the image however PIL wants to"""
        self.as_pil_image().show()

    def __repr__(self):
        return '<pikepdf.PdfImage image mode={} size={}x{} at {}>'.format(
            self.mode, self.width, self.height, hex(id(self))
        )

    def _repr_png_(self):
        """Display hook for IPython/Jupyter"""
        b = BytesIO()
        im = self.as_pil_image()
        im.save(b, 'PNG')
        return b.getvalue()


class PdfJpxImage(PdfImage):
    def __init__(self, obj):
        super().__init__(obj)
        self.pil = self.as_pil_image()

    def _extract_direct(self, *, stream):
        data, filters = self._unstack_compression(
            self.obj.get_raw_stream_buffer(), self.filters
        )
        if filters != ['/JPXDecode']:
            raise UnsupportedImageTypeError(self.filters)
        stream.write(data)
        return '.jp2'

    @property
    def _colorspaces(self):
        # (PDF 1.7 Table 89) If ColorSpace is present, any colour space
        # specifications in the JPEG2000 data shall be ignored.
        super_colorspaces = super()._colorspaces
        if super_colorspaces:
            return super_colorspaces
        if self.pil.mode == 'L':
            return ['/DeviceGray']
        elif self.pil.mode == 'RGB':
            return ['/DeviceRGB']
        raise NotImplementedError('Complex JP2 colorspace')

    @property
    def _bpc(self):
        # (PDF 1.7 Table 89) If the image stream uses the JPXDecode filter, this
        # entry is optional and shall be ignored if present. The bit depth is
        # determined by the conforming reader in the process of decoding the
        # JPEG2000 image.
        return 8

    @property
    def indexed(self):
        # Nothing in the spec precludes an Indexed JPXDecode image, except for
        # the fact that doing so is madness. Let's assume it no one is that
        # insane.
        return False

    def __repr__(self):
        return '<pikepdf.PdfJpxImage JPEG2000 image mode={} size={}x{} at {}>'.format(
            self.mode, self.width, self.height, hex(id(self))
        )


class PdfInlineImage(PdfImageBase):
    """Support class for PDF inline images"""

    # Inline images can contain abbreviations that we write automatically
    ABBREVS = {
        b'/W': b'/Width',
        b'/H': b'/Height',
        b'/BPC': b'/BitsPerComponent',
        b'/IM': b'/ImageMask',
        b'/CS': b'/ColorSpace',
        b'/F': b'/Filter',
        b'/DP': b'/DecodeParms',
        b'/G': b'/DeviceGray',
        b'/RGB': b'/DeviceRGB',
        b'/CMYK': b'/DeviceCMYK',
        b'/I': b'/Indexed',
        b'/AHx': b'/ASCIIHexDecode',
        b'/A85': b'/ASCII85Decode',
        b'/LZW': b'/LZWDecode',
        b'/RL': b'/RunLengthDecode',
        b'/CCF': b'/CCITTFaxDecode',
        b'/DCT': b'/DCTDecode',
    }

    def __init__(self, *, image_data, image_object: tuple):
        """
        Args:
            image_data: data stream for image, extracted from content stream
            image_object: the metadata for image, also from content stream
        """

        # Convert the sequence of pikepdf.Object from the content stream into
        # a dictionary object by unparsing it (to bytes), eliminating inline
        # image abbreviations, and constructing a bytes string equivalent to
        # what an image XObject would look like. Then retrieve data from there

        self._data = image_data
        self._image_object = image_object

        reparse = b' '.join(self._unparse_obj(obj) for obj in image_object)
        try:
            reparsed_obj = Object.parse(b'<< ' + reparse + b' >>')
        except PdfError as e:
            raise PdfError("parsing inline " + reparse.decode('unicode_escape')) from e
        self.obj = reparsed_obj
        self.pil = None

    @classmethod
    def _unparse_obj(cls, obj):
        if isinstance(obj, Object):
            if isinstance(obj, Name):
                name = obj.unparse(resolved=True)
                assert isinstance(name, bytes)
                return cls.ABBREVS.get(name, name)
            else:
                return obj.unparse(resolved=True)
        elif isinstance(obj, bool):
            return b'true' if obj else b'false'  # Lower case for PDF spec
        elif isinstance(obj, (int, Decimal, float)):
            return str(obj).encode('ascii')
        else:
            raise NotImplementedError(repr(obj))

    def _metadata(self, name, type_, default):
        return metadata_from_obj(self.obj, name, type_, default)

    def unparse(self):
        tokens = []
        tokens.append(b'BI')
        metadata = []
        for metadata_obj in self._image_object:
            unparsed = self._unparse_obj(metadata_obj)
            assert isinstance(unparsed, bytes)
            metadata.append(unparsed)
        tokens.append(b' '.join(metadata))
        tokens.append(b'ID')
        tokens.append(self._data._inline_image_raw_bytes())
        tokens.append(b'EI')
        return b'\n'.join(tokens)

    @property
    def is_inline(self):
        return True

    @property
    def icc(self):
        raise ValueError("Inline images may not have ICC profiles")

    def __repr__(self):
        mode = '?'
        try:
            mode = self.mode
        except Exception:
            pass
        return '<pikepdf.PdfInlineImage image mode={} size={}x{} at {}>'.format(
            mode, self.width, self.height, hex(id(self))
        )

    def as_pil_image(self):
        if self.pil:
            return self.pil

        raise NotImplementedError('not yet')

    def extract_to(
        self, *, stream=None, fileprefix=''
    ):  # pylint: disable=unused-argument
        raise UnsupportedImageTypeError("inline images don't support extract")

    def read_bytes(self):
        raise NotImplementedError("qpdf returns compressed")
        # return self._data._inline_image_bytes()

    def get_stream_buffer(self):
        raise NotImplementedError("qpdf returns compressed")
        # return memoryview(self._data.inline_image_bytes())