|
2 | 2 | import json |
3 | 3 | import os |
4 | 4 | import re |
5 | | -from logging import getLogger |
| 5 | +import logging |
6 | 6 | from typing import List |
7 | | - |
| 7 | +from concurrent.futures import ThreadPoolExecutor |
8 | 8 | import cv2 |
9 | 9 | import numpy as np |
10 | 10 | import xmltodict |
11 | | -from PIL import Image |
12 | | - |
| 11 | +from PIL import Image, ImageColor, ImageDraw |
13 | 12 | from fastlabel import const, converters, utils |
14 | | -from fastlabel.const import AnnotationType |
| 13 | +from fastlabel.const import ( |
| 14 | + EXPORT_IMAGE_WITH_ANNOTATIONS_SUPPORTED_IMAGE_TYPES, |
| 15 | + KEYPOINT_MIN_STROKE_WIDTH, |
| 16 | + OPACITY_DARK, |
| 17 | + OPACITY_THIN, |
| 18 | + POSE_ESTIMATION_MIN_STROKE_WIDTH, |
| 19 | + SEPARATOER, |
| 20 | + AnnotationType, |
| 21 | +) |
15 | 22 |
|
16 | 23 | from .api import Api |
17 | 24 | from .exceptions import FastLabelInvalidException |
18 | 25 |
|
19 | | -logger = getLogger(__name__) |
| 26 | +logger = logging.getLogger(__name__) |
| 27 | +logging.basicConfig( |
| 28 | + level=logging.INFO, |
| 29 | + format="%(levelname)s: %(message)s", |
| 30 | +) |
20 | 31 |
|
21 | 32 |
|
22 | 33 | class Client: |
@@ -2225,6 +2236,278 @@ def __get_cv_draw_points(self, points: List[int]) -> List[int]: |
2225 | 2236 | cv_points.append((new_points[i * 2], new_points[i * 2 + 1])) |
2226 | 2237 | return np.array(cv_points) |
2227 | 2238 |
|
| 2239 | + def __reverse_points(self, points: list[int]) -> list[int]: |
| 2240 | + """ |
| 2241 | + e.g.) |
| 2242 | + [4, 5, 4, 9, 8, 9, 8, 5, 4, 5] => [4, 5, 8, 5, 8, 9, 4, 9, 4, 5] |
| 2243 | + """ |
| 2244 | + reversed_points = [] |
| 2245 | + for index, _ in enumerate(points): |
| 2246 | + if index % 2 == 0: |
| 2247 | + reversed_points.insert(0, points[index + 1]) |
| 2248 | + reversed_points.insert(0, points[index]) |
| 2249 | + return reversed_points |
| 2250 | + |
| 2251 | + def __create_image_with_annotation(self, img_file_path_task): |
| 2252 | + [img_file_path, task, output_dir] = img_file_path_task |
| 2253 | + img = Image.open(img_file_path).convert("RGB") |
| 2254 | + width, height = img.size |
| 2255 | + if width > height: |
| 2256 | + stroke_width = int(height / 300) |
| 2257 | + else: |
| 2258 | + stroke_width = int(width / 300) |
| 2259 | + stroke_width = stroke_width if stroke_width > 1 else 1 |
| 2260 | + draw_img = ImageDraw.Draw(img, "RGBA") |
| 2261 | + # For segmentation task |
| 2262 | + is_seg = False |
| 2263 | + seg_mask_images = [] |
| 2264 | + task_annotations = task["annotations"] |
| 2265 | + for task_annotation in task_annotations: |
| 2266 | + # Draw annotations in content |
| 2267 | + rgb = None |
| 2268 | + try: |
| 2269 | + rgb = ImageColor.getcolor(task_annotation["color"], "RGB") |
| 2270 | + except Exception as e: |
| 2271 | + logger.info(e) |
| 2272 | + if not rgb: |
| 2273 | + continue |
| 2274 | + rgba_dark = rgb + (OPACITY_DARK,) |
| 2275 | + rgba_thin = rgb + (OPACITY_THIN,) |
| 2276 | + if AnnotationType(task_annotation["type"]) == AnnotationType.bbox: |
| 2277 | + points = task_annotation["points"] |
| 2278 | + draw_img.rectangle( |
| 2279 | + points, fill=rgba_thin, outline=rgba_dark, width=stroke_width |
| 2280 | + ) |
| 2281 | + elif AnnotationType(task_annotation["type"]) == AnnotationType.circle: |
| 2282 | + x = task_annotation["points"][0] |
| 2283 | + y = task_annotation["points"][1] |
| 2284 | + radius = task_annotation["points"][2] |
| 2285 | + points = [ |
| 2286 | + x - radius, |
| 2287 | + y - radius, |
| 2288 | + x + radius, |
| 2289 | + y + radius, |
| 2290 | + ] |
| 2291 | + draw_img.ellipse(points, fill=rgba_dark, width=radius) |
| 2292 | + elif AnnotationType(task_annotation["type"]) == AnnotationType.polygon: |
| 2293 | + points = task_annotation["points"] |
| 2294 | + # require start point at the end |
| 2295 | + points.append(points[0]) |
| 2296 | + points.append(points[1]) |
| 2297 | + draw_img.line(points, fill=rgba_dark, width=stroke_width) |
| 2298 | + draw_img.polygon(points, fill=rgba_thin) |
| 2299 | + elif AnnotationType(task_annotation["type"]) == AnnotationType.keypoint: |
| 2300 | + x = task_annotation["points"][0] |
| 2301 | + y = task_annotation["points"][1] |
| 2302 | + if stroke_width < KEYPOINT_MIN_STROKE_WIDTH: |
| 2303 | + stroke_width = KEYPOINT_MIN_STROKE_WIDTH |
| 2304 | + points = [ |
| 2305 | + x - stroke_width, |
| 2306 | + y - stroke_width, |
| 2307 | + x + stroke_width, |
| 2308 | + y + stroke_width, |
| 2309 | + ] |
| 2310 | + draw_img.ellipse(points, fill=rgba_dark, width=stroke_width) |
| 2311 | + elif AnnotationType(task_annotation["type"]) == AnnotationType.line: |
| 2312 | + points = task_annotation["points"] |
| 2313 | + draw_img.line(points, fill=rgba_dark, width=stroke_width) |
| 2314 | + elif AnnotationType(task_annotation["type"]) == AnnotationType.segmentation: |
| 2315 | + is_seg = True |
| 2316 | + rgba_seg = rgb + (OPACITY_THIN * 2,) |
| 2317 | + seg_mask_ground = Image.new("RGBA", (width, height), (0, 0, 0, 0)) |
| 2318 | + seg_mask_im = np.array(seg_mask_ground) |
| 2319 | + for region in task_annotation["points"]: |
| 2320 | + count = 0 |
| 2321 | + for points in region: |
| 2322 | + if count == 0: |
| 2323 | + cv_draw_points = self.__get_cv_draw_points(points) |
| 2324 | + # For diagonal segmentation points, fillPoly cannot rendering cv_drawpoints, so convert |
| 2325 | + # shape. When multiimage project can use only pixcel mode, remove it |
| 2326 | + converted_points = ( |
| 2327 | + np.array(cv_draw_points) |
| 2328 | + .reshape((-1, 1, 2)) |
| 2329 | + .astype(np.int32) |
| 2330 | + ) |
| 2331 | + cv2.fillPoly( |
| 2332 | + seg_mask_im, |
| 2333 | + [converted_points], |
| 2334 | + rgba_seg, |
| 2335 | + lineType=cv2.LINE_8, |
| 2336 | + shift=0, |
| 2337 | + ) |
| 2338 | + else: |
| 2339 | + # Reverse hollow points for opencv because this points are counter clockwise |
| 2340 | + cv_draw_points = self.__get_cv_draw_points( |
| 2341 | + self.__reverse_points(points) |
| 2342 | + ) |
| 2343 | + converted_points = ( |
| 2344 | + np.array(cv_draw_points) |
| 2345 | + .reshape((-1, 1, 2)) |
| 2346 | + .astype(np.int32) |
| 2347 | + ) |
| 2348 | + cv2.fillPoly( |
| 2349 | + seg_mask_im, |
| 2350 | + [converted_points], |
| 2351 | + (0, 0, 0, 0), |
| 2352 | + lineType=cv2.LINE_8, |
| 2353 | + shift=0, |
| 2354 | + ) |
| 2355 | + count += 1 |
| 2356 | + seg_mask_images.append(seg_mask_im) |
| 2357 | + elif ( |
| 2358 | + AnnotationType(task_annotation["type"]) |
| 2359 | + == AnnotationType.pose_estimation |
| 2360 | + ): |
| 2361 | + """ |
| 2362 | + { |
| 2363 | + keypoint_id: { |
| 2364 | + point: [x, y], |
| 2365 | + keypoint_rgb: keypoint.color |
| 2366 | + } |
| 2367 | + } |
| 2368 | + """ |
| 2369 | + if stroke_width < POSE_ESTIMATION_MIN_STROKE_WIDTH: |
| 2370 | + stroke_width = POSE_ESTIMATION_MIN_STROKE_WIDTH |
| 2371 | + linked_points_and_color_to_key_map = {} |
| 2372 | + relations = [] |
| 2373 | + for task_annotation_keypoint in task_annotation["keypoints"]: |
| 2374 | + try: |
| 2375 | + task_annotation_keypoint_keypoint_color = task_annotation[ |
| 2376 | + "color" |
| 2377 | + ] |
| 2378 | + task_annotation_keypoint_name = task_annotation_keypoint["name"] |
| 2379 | + task_annotation_keypoint_value = task_annotation_keypoint[ |
| 2380 | + "value" |
| 2381 | + ] |
| 2382 | + task_annotation_keypoint_key = task_annotation_keypoint["key"] |
| 2383 | + keypoint_rgb = ImageColor.getcolor( |
| 2384 | + task_annotation_keypoint_keypoint_color, "RGB" |
| 2385 | + ) |
| 2386 | + except Exception as e: |
| 2387 | + logger.info( |
| 2388 | + f"Invalid color: {task_annotation_keypoint_keypoint_color}, " |
| 2389 | + f"content_name: {task_annotation_keypoint_name}, {e}" |
| 2390 | + ) |
| 2391 | + if not keypoint_rgb: |
| 2392 | + continue |
| 2393 | + if not task_annotation_keypoint_value: |
| 2394 | + continue |
| 2395 | + |
| 2396 | + x = task_annotation_keypoint_value[0] |
| 2397 | + y = task_annotation_keypoint_value[1] |
| 2398 | + linked_points_and_color_to_key_map[task_annotation_keypoint_key] = { |
| 2399 | + "point": [x, y], |
| 2400 | + "keypoint_rgb": keypoint_rgb, |
| 2401 | + } |
| 2402 | + for edge in task_annotation_keypoint["edges"]: |
| 2403 | + relations.append( |
| 2404 | + SEPARATOER.join( |
| 2405 | + sorted([task_annotation_keypoint_key, edge]) |
| 2406 | + ) |
| 2407 | + ) |
| 2408 | + |
| 2409 | + for relation in set(relations): |
| 2410 | + first_key, second_key = relation.split(SEPARATOER) |
| 2411 | + if ( |
| 2412 | + linked_points_and_color_to_key_map.get(first_key) is None |
| 2413 | + or linked_points_and_color_to_key_map.get(second_key) is None |
| 2414 | + ): |
| 2415 | + continue |
| 2416 | + line_start_point = linked_points_and_color_to_key_map.get( |
| 2417 | + first_key |
| 2418 | + )["point"] |
| 2419 | + line_end_point = linked_points_and_color_to_key_map.get(second_key)[ |
| 2420 | + "point" |
| 2421 | + ] |
| 2422 | + relation_line_points = line_start_point + line_end_point |
| 2423 | + |
| 2424 | + draw_img.line( |
| 2425 | + relation_line_points, fill=rgba_dark, width=stroke_width |
| 2426 | + ) |
| 2427 | + |
| 2428 | + for key in linked_points_and_color_to_key_map: |
| 2429 | + x, y = linked_points_and_color_to_key_map[key]["point"] |
| 2430 | + points = [ |
| 2431 | + x - stroke_width, |
| 2432 | + y - stroke_width, |
| 2433 | + x + stroke_width, |
| 2434 | + y + stroke_width, |
| 2435 | + ] |
| 2436 | + draw_img.ellipse( |
| 2437 | + points, |
| 2438 | + fill=linked_points_and_color_to_key_map[key]["keypoint_rgb"], |
| 2439 | + width=stroke_width, |
| 2440 | + ) |
| 2441 | + |
| 2442 | + if is_seg: |
| 2443 | + # For segmentation, merge each mask images with logical adding |
| 2444 | + mask_seg_ground = Image.new("RGBA", (width, height), (0, 0, 0, 0)) |
| 2445 | + mask_seg = np.array(mask_seg_ground) |
| 2446 | + for seg_mask_image in seg_mask_images: |
| 2447 | + mask_seg = mask_seg | seg_mask_image |
| 2448 | + |
| 2449 | + # Alpha brend original image and segmentation mask |
| 2450 | + np_img = np.array(img.convert("RGBA")) |
| 2451 | + merged_seg = np_img * 0.5 + mask_seg * 0.5 |
| 2452 | + # Composite all. 'merged_seg' will be used rendering annotation area, |
| 2453 | + # other area will calcurate from 'mask_seg' and rendered by original image |
| 2454 | + img = Image.composite( |
| 2455 | + Image.fromarray(merged_seg.astype(np.uint8)), |
| 2456 | + Image.fromarray(np_img.astype(np.uint8)), |
| 2457 | + Image.fromarray(mask_seg.astype(np.uint8)), |
| 2458 | + ) |
| 2459 | + |
| 2460 | + # For export with original ext, if original image is not png foamat, convert RGB |
| 2461 | + if os.path.splitext(img_file_path)[1].lower() != ".png": |
| 2462 | + img = img.convert("RGB") |
| 2463 | + # Save annotated content |
| 2464 | + output_file_path = os.path.join(output_dir, task["name"]) |
| 2465 | + os.makedirs(os.path.dirname(output_file_path), exist_ok=True) |
| 2466 | + img.save(output_file_path, quality=95) |
| 2467 | + |
| 2468 | + def export_image_with_annotations( |
| 2469 | + self, |
| 2470 | + tasks: list, |
| 2471 | + image_dir: str, |
| 2472 | + output_dir: str = os.path.join("output", "images_with_annotations"), |
| 2473 | + ) -> None: |
| 2474 | + """ |
| 2475 | + Export image with annotations |
| 2476 | + """ |
| 2477 | + target_file_candidate_paths = glob.glob( |
| 2478 | + os.path.join(image_dir, "**"), recursive=True |
| 2479 | + ) |
| 2480 | + img_file_paths = [] |
| 2481 | + for target_file_candidate_path in target_file_candidate_paths: |
| 2482 | + if not os.path.isfile(target_file_candidate_path): |
| 2483 | + continue |
| 2484 | + if not target_file_candidate_path.lower().endswith( |
| 2485 | + EXPORT_IMAGE_WITH_ANNOTATIONS_SUPPORTED_IMAGE_TYPES |
| 2486 | + ): |
| 2487 | + continue |
| 2488 | + img_file_paths.append(target_file_candidate_path) |
| 2489 | + img_file_paths.sort() |
| 2490 | + |
| 2491 | + img_file_path_task_list = [] |
| 2492 | + for img_file_path in img_file_paths: |
| 2493 | + slashed_img_file_path = img_file_path.replace(os.path.sep, "/") |
| 2494 | + task_name = ( |
| 2495 | + slashed_img_file_path.replace(image_dir + "/", "") |
| 2496 | + if not image_dir.endswith("/") |
| 2497 | + else slashed_img_file_path.replace(image_dir, "") |
| 2498 | + ) |
| 2499 | + task = next( |
| 2500 | + filter(lambda x: x["name"] == task_name, tasks), |
| 2501 | + None, |
| 2502 | + ) |
| 2503 | + if not task: |
| 2504 | + logger.info(f"Not find task. filepath: {task_name}") |
| 2505 | + continue |
| 2506 | + img_file_path_task_list.append([img_file_path, task, output_dir]) |
| 2507 | + |
| 2508 | + with ThreadPoolExecutor(max_workers=4) as executor: |
| 2509 | + executor.map(self.__create_image_with_annotation, img_file_path_task_list) |
| 2510 | + |
2228 | 2511 | # Annotation |
2229 | 2512 |
|
2230 | 2513 | def find_annotation(self, annotation_id: str) -> dict: |
|
0 commit comments