2121
2222from .pagination import DEFAULT_PAGE_SIZE , paginated_format
2323from .models import (
24+ TYPE_ALL_CRAWL_STATES ,
2425 CrawlConfigIn ,
2526 ConfigRevision ,
2627 CrawlConfig ,
2728 CrawlConfigOut ,
28- CrawlConfigTags ,
29+ TagsResponse ,
2930 CrawlOut ,
3031 CrawlOutWithResources ,
3132 UpdateCrawlConfig ,
5253 ListFilterType ,
5354 ScopeType ,
5455 Seed ,
56+ Profile ,
5557)
5658from .utils import (
5759 dt_now ,
@@ -147,6 +149,8 @@ def __init__(
147149 minutes = int (os .environ .get ("PAUSED_CRAWL_LIMIT_MINUTES" , "10080" ))
148150 )
149151
152+ self .crawl_queue_limit_scale = int (os .environ .get ("CRAWL_QUEUE_LIMIT_SCALE" , 0 ))
153+
150154 self .router = APIRouter (
151155 prefix = "/crawlconfigs" ,
152156 tags = ["crawlconfigs" ],
@@ -196,6 +200,14 @@ async def init_index(self):
196200 [("oid" , pymongo .ASCENDING ), ("tags" , pymongo .ASCENDING )]
197201 )
198202
203+ await self .crawl_configs .create_index (
204+ [
205+ ("oid" , pymongo .ASCENDING ),
206+ ("inactive" , pymongo .ASCENDING ),
207+ ("profileid" , pymongo .ASCENDING ),
208+ ]
209+ )
210+
199211 await self .crawl_configs .create_index (
200212 [("lastRun" , pymongo .DESCENDING ), ("modified" , pymongo .DESCENDING )]
201213 )
@@ -218,21 +230,6 @@ def sanitize(self, string=""):
218230 """sanitize string for use in wacz filename"""
219231 return self ._file_rx .sub ("-" , string .lower ())
220232
221- async def get_profile_filename (
222- self , profileid : Optional [UUID ], org : Organization
223- ) -> str :
224- """lookup filename from profileid"""
225- if not profileid :
226- return ""
227-
228- profile_filename , _ = await self .profiles .get_profile_storage_path_and_proxy (
229- profileid , org
230- )
231- if not profile_filename :
232- raise HTTPException (status_code = 400 , detail = "invalid_profile_id" )
233-
234- return profile_filename
235-
236233 # pylint: disable=invalid-name, too-many-branches, too-many-statements, too-many-locals
237234 async def add_crawl_config (
238235 self ,
@@ -732,6 +729,7 @@ async def get_crawl_configs(
732729 description : Optional [str ] = None ,
733730 tags : Optional [List [str ]] = None ,
734731 tag_match : Optional [ListFilterType ] = ListFilterType .AND ,
732+ last_crawl_state : list [TYPE_ALL_CRAWL_STATES ] | None = None ,
735733 schedule : Optional [bool ] = None ,
736734 is_crawl_running : Optional [bool ] = None ,
737735 sort_by : str = "lastRun" ,
@@ -773,6 +771,9 @@ async def get_crawl_configs(
773771 if is_crawl_running is not None :
774772 match_query ["isCrawlRunning" ] = is_crawl_running
775773
774+ if last_crawl_state :
775+ match_query ["lastCrawlState" ] = {"$in" : last_crawl_state }
776+
776777 # pylint: disable=duplicate-code
777778 aggregate : List [Dict [str , Union [object , str , int ]]] = [
778779 {"$match" : match_query },
@@ -842,10 +843,37 @@ async def get_crawl_configs(
842843 async def is_profile_in_use (self , profileid : UUID , org : Organization ) -> bool :
843844 """return true/false if any active workflows exist with given profile"""
844845 res = await self .crawl_configs .find_one (
845- {"profileid " : profileid , "inactive" : {"$ne" : True }, "oid " : org . id }
846+ {"oid " : org . id , "inactive" : {"$ne" : True }, "profileid " : profileid }
846847 )
847848 return res is not None
848849
850+ async def mark_profiles_in_use (self , profiles : List [Profile ], org : Organization ):
851+ """mark which profiles are in use by querying and grouping crawlconfigs"""
852+ profile_ids = [profile .id for profile in profiles ]
853+ cursor = self .crawl_configs .aggregate (
854+ [
855+ {
856+ "$match" : {
857+ "oid" : org .id ,
858+ "inactive" : {"$ne" : True },
859+ "profileid" : {"$in" : profile_ids },
860+ }
861+ },
862+ {"$group" : {"_id" : "$profileid" , "count" : {"$sum" : 1 }}},
863+ ]
864+ )
865+ results = await cursor .to_list ()
866+ in_use = set ()
867+ for res in results :
868+ if res .get ("count" ) > 0 :
869+ in_use .add (res .get ("_id" ))
870+
871+ for profile in profiles :
872+ if profile .id in in_use :
873+ profile .inUse = True
874+
875+ return profiles
876+
849877 async def get_running_crawl (self , cid : UUID ) -> Optional [CrawlOut ]:
850878 """Return the id of currently running crawl for this config, if any"""
851879 # crawls = await self.crawl_manager.list_running_crawls(cid=crawlconfig.id)
@@ -1174,7 +1202,18 @@ async def run_now_internal(
11741202 if crawlconfig .proxyId and not self .can_org_use_proxy (org , crawlconfig .proxyId ):
11751203 raise HTTPException (status_code = 404 , detail = "proxy_not_found" )
11761204
1177- profile_filename = await self .get_profile_filename (crawlconfig .profileid , org )
1205+ await self .check_if_too_many_waiting_crawls (org )
1206+
1207+ profile_filename , profile_proxy_id = (
1208+ await self .profiles .get_profile_filename_and_proxy (
1209+ crawlconfig .profileid , org
1210+ )
1211+ )
1212+ if crawlconfig .profileid and not profile_filename :
1213+ raise HTTPException (status_code = 400 , detail = "invalid_profile_id" )
1214+
1215+ save_profile_id = self .get_save_profile_id (profile_proxy_id , crawlconfig )
1216+
11781217 storage_filename = (
11791218 crawlconfig .crawlFilenameTemplate or self .default_filename_template
11801219 )
@@ -1204,6 +1243,7 @@ async def run_now_internal(
12041243 warc_prefix = self .get_warc_prefix (org , crawlconfig ),
12051244 storage_filename = storage_filename ,
12061245 profile_filename = profile_filename or "" ,
1246+ profileid = save_profile_id ,
12071247 is_single_page = self .is_single_page (crawlconfig .config ),
12081248 seed_file_url = seed_file_url ,
12091249 )
@@ -1215,6 +1255,40 @@ async def run_now_internal(
12151255 print (traceback .format_exc ())
12161256 raise HTTPException (status_code = 500 , detail = f"Error starting crawl: { exc } " )
12171257
1258+ def get_save_profile_id (
1259+ self , profile_proxy_id : str , crawlconfig : CrawlConfig
1260+ ) -> str :
1261+ """return profile id if profile should be auto-saved, or empty str if not"""
1262+ # if no profile, nothing to save
1263+ if not crawlconfig .profileid :
1264+ return ""
1265+
1266+ # if no proxies, allow saving
1267+ if not crawlconfig .proxyId and not profile_proxy_id :
1268+ return str (crawlconfig .profileid )
1269+
1270+ # if proxy ids match, allow saving
1271+ if crawlconfig .proxyId == profile_proxy_id :
1272+ return str (crawlconfig .profileid )
1273+
1274+ # otherwise, don't save
1275+ return ""
1276+
1277+ async def check_if_too_many_waiting_crawls (self , org : Organization ):
1278+ """if max concurrent crawls are set, limit number of queued crawls to X concurrent limit
1279+ return 429 if at limit"""
1280+ max_concur = org .quotas .maxConcurrentCrawls
1281+ if not max_concur or not self .crawl_queue_limit_scale :
1282+ return
1283+
1284+ num_waiting = await self .crawls .count_documents (
1285+ {"oid" : org .id , "state" : "waiting_org_limit" }
1286+ )
1287+ if num_waiting < max_concur * self .crawl_queue_limit_scale :
1288+ return
1289+
1290+ raise HTTPException (status_code = 429 , detail = "slow_down_too_many_crawls_queued" )
1291+
12181292 async def set_config_current_crawl_info (
12191293 self , cid : UUID , crawl_id : str , crawl_start : datetime , user : User
12201294 ):
@@ -1567,6 +1641,10 @@ async def get_crawl_configs(
15671641 description = 'Defaults to `"and"` if omitted' ,
15681642 ),
15691643 ] = ListFilterType .AND ,
1644+ last_crawl_state : Annotated [
1645+ list [TYPE_ALL_CRAWL_STATES ] | None ,
1646+ Query (alias = "lastCrawlState" , title = "Last Crawl State" ),
1647+ ] = None ,
15701648 schedule : Optional [bool ] = None ,
15711649 is_crawl_running : Annotated [
15721650 Optional [bool ], Query (alias = "isCrawlRunning" , title = "Is Crawl Running" )
@@ -1596,6 +1674,7 @@ async def get_crawl_configs(
15961674 description = description ,
15971675 tags = tag ,
15981676 tag_match = tag_match ,
1677+ last_crawl_state = last_crawl_state ,
15991678 schedule = schedule ,
16001679 is_crawl_running = is_crawl_running ,
16011680 page_size = page_size ,
@@ -1612,7 +1691,7 @@ async def get_crawl_config_tags(org: Organization = Depends(org_viewer_dep)):
16121691 """
16131692 return await ops .get_crawl_config_tags (org )
16141693
1615- @router .get ("/tagCounts" , response_model = CrawlConfigTags )
1694+ @router .get ("/tagCounts" , response_model = TagsResponse )
16161695 async def get_crawl_config_tag_counts (org : Organization = Depends (org_viewer_dep )):
16171696 return {"tags" : await ops .get_crawl_config_tag_counts (org )}
16181697
0 commit comments