diff --git a/invokeai/app/util/read_watermark.py b/invokeai/app/util/read_watermark.py new file mode 100644 index 00000000000..5fe492d2b04 --- /dev/null +++ b/invokeai/app/util/read_watermark.py @@ -0,0 +1,34 @@ +"""CLI command to decode invisible watermarks from Invoke-generated images.""" + +import argparse +import sys + +from PIL import Image + +from invokeai.backend.image_util.invisible_watermark import InvisibleWatermark + + +def read_watermark() -> None: + """Read and print invisible watermarks from a list of image files.""" + parser = argparse.ArgumentParser( + prog="invoke-readwatermark", + description="Decode invisible watermarks from Invoke-generated images.", + ) + parser.add_argument("images", nargs="+", metavar="IMAGE", help="Image file(s) to read watermarks from.") + parser.add_argument( + "--length", + type=int, + default=8, + metavar="BYTES", + help="Expected watermark length in bytes (default: %(default)s, matching the default 'InvokeAI' watermark text).", + ) + args = parser.parse_args() + + for path in args.images: + try: + image = Image.open(path) + except OSError as e: + print(f"{path}: error opening image: {e}", file=sys.stderr) + continue + watermark = InvisibleWatermark.decode_watermark(image, watermark_length=args.length) + print(f"{path}: {watermark}") diff --git a/invokeai/backend/image_util/invisible_watermark.py b/invokeai/backend/image_util/invisible_watermark.py index 5b0b2dbb5b1..6a87f4da47e 100644 --- a/invokeai/backend/image_util/invisible_watermark.py +++ b/invokeai/backend/image_util/invisible_watermark.py @@ -9,7 +9,7 @@ from PIL import Image import invokeai.backend.util.logging as logger -from invokeai.backend.image_util.imwatermark.vendor import WatermarkEncoder +from invokeai.backend.image_util.imwatermark.vendor import WatermarkDecoder, WatermarkEncoder class InvisibleWatermark: @@ -25,3 +25,23 @@ def add_watermark(cls, image: Image.Image, watermark_text: str) -> Image.Image: encoder.set_watermark("bytes", watermark_text.encode("utf-8")) bgr_encoded = encoder.encode(bgr, "dwtDct") return Image.fromarray(cv2.cvtColor(bgr_encoded, cv2.COLOR_BGR2RGB)).convert("RGBA") + + @classmethod + def decode_watermark(cls, image: Image.Image, watermark_length: int = 8) -> str: + """Decode an invisible watermark from an image. + + Args: + image: The PIL image to decode the watermark from. + watermark_length: The length of the watermark in bytes (default 8, matching the default "InvokeAI" watermark text). + + Returns: + The decoded watermark text, or an empty string if decoding fails. + """ + bgr = cv2.cvtColor(np.array(image.convert("RGB")), cv2.COLOR_RGB2BGR) + decoder = WatermarkDecoder("bytes", watermark_length * 8) + try: + raw = decoder.decode(bgr, "dwtDct") + return raw.rstrip(b"\x00").decode("utf-8", errors="replace") + except (RuntimeError, ValueError, NameError) as e: + logger.debug("Failed to decode watermark: %s", e) + return "" diff --git a/pyproject.toml b/pyproject.toml index adfe5982baf..a7cf0d3a420 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -159,6 +159,7 @@ explicit = true [project.scripts] "invokeai-web" = "invokeai.app.run_app:run_app" +"invoke-readwatermark" = "invokeai.app.util.read_watermark:read_watermark" [project.urls] "Homepage" = "https://invoke-ai.github.io/InvokeAI/"