Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.Set;

public interface PostRepository extends JpaRepository<Post, Long> {

boolean existsByUrl(String url);
@Query("SELECT p.url FROM Post p WHERE p.url IN :urls")
Set<String> findExistingUrls(@Param("urls") List<String> urls);

@Query("""
SELECT p FROM Post p
Expand Down
77 changes: 36 additions & 41 deletions src/main/java/com/techfork/domain/source/batch/RssFeedReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.rometools.rome.feed.synd.SyndFeed;
import com.rometools.rome.io.SyndFeedInput;
import com.rometools.rome.io.XmlReader;
import com.techfork.domain.post.repository.PostRepository;
import com.techfork.domain.source.dto.RssFeedItem;
import com.techfork.domain.source.entity.TechBlog;
import com.techfork.domain.source.repository.TechBlogRepository;
Expand All @@ -21,7 +22,8 @@
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Set;
import java.util.stream.Stream;

@Slf4j
@Component
Expand All @@ -30,59 +32,52 @@
public class RssFeedReader implements ItemReader<RssFeedItem> {

private final TechBlogRepository techBlogRepository;
private final PostRepository postRepository;
private final WebClient webClient;

private ConcurrentLinkedQueue<RssFeedItem> itemQueue;
private List<RssFeedItem> items;
private int currentIndex = 0;

@Override
public RssFeedItem read() {
// 첫 실행 시 모든 RSS 아이템을 큐에 추가
if (itemQueue == null) {
initializeQueue();
if (items == null) {
initializeItems();
}

// 큐에서 아이템 꺼내기 (Thread-Safe)
RssFeedItem item = itemQueue.poll();

if (item == null) {
log.info("모든 RSS 피드 수집 완료");
if (currentIndex >= items.size()) {
log.info("모든 RSS 피드 수집 완료: 총 {}개", items.size());
return null;
}

return item;
return items.get(currentIndex++);
}

/**
* 모든 RSS 피드를 미리 수집하여 큐에 저장
* 한 번만 실행되며, 여러 스레드가 큐에서 안전하게 아이템을 가져감
*/
private synchronized void initializeQueue() {
// Double-checked locking
if (itemQueue != null) {
return;
}

itemQueue = new ConcurrentLinkedQueue<>();
private void initializeItems() {
List<TechBlog> techBlogs = techBlogRepository.findAll();
log.info("총 {}개 테크 블로그 RSS 수집 시작", techBlogs.size());

int totalItems = 0;
for (TechBlog techBlog : techBlogs) {
try {
List<RssFeedItem> items = fetchRssFeed(techBlog);
if (!items.isEmpty()) {
itemQueue.addAll(items);
totalItems += items.size();
log.info("[{}] RSS 수집 성공: {}개 아이템", techBlog.getCompanyName(), items.size());
} else {
log.warn("[{}] RSS 피드에 아이템이 없습니다", techBlog.getCompanyName());
}
} catch (Exception e) {
log.error("[{}] RSS 수집 실패: {}", techBlog.getCompanyName(), e.getMessage(), e);
// 실패해도 다음 블로그 계속 처리
}
}

log.info("RSS 수집 초기화 완료: 총 {}개 아이템을 큐에 추가", totalItems);
List<RssFeedItem> allItems = techBlogs.parallelStream()
.flatMap(techBlog -> {
try {
List<RssFeedItem> feedItems = fetchRssFeed(techBlog);
log.info("[{}] RSS 수집 성공: {}개", techBlog.getCompanyName(), feedItems.size());
return feedItems.stream();
} catch (Exception e) {
log.error("[{}] RSS 수집 실패: {}", techBlog.getCompanyName(), e.getMessage());
return Stream.empty();
}
})
.toList();

Set<String> existingUrls = postRepository.findExistingUrls(
allItems.stream().map(RssFeedItem::url).toList()
);

items = allItems.stream()
.filter(item -> !existingUrls.contains(item.url()))
.toList();

log.info("RSS 수집 초기화 완료: 총 {}개 아이템", items.size());
}

private List<RssFeedItem> fetchRssFeed(TechBlog techBlog) throws Exception {
Expand Down Expand Up @@ -165,4 +160,4 @@ private LocalDateTime convertToLocalDateTime(Date date) {
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.techfork.domain.source.batch;

import com.techfork.domain.post.entity.Post;
import com.techfork.domain.post.repository.PostRepository;
import com.techfork.domain.source.dto.RssFeedItem;
import com.techfork.domain.source.entity.TechBlog;
import com.techfork.domain.source.repository.TechBlogRepository;
Expand All @@ -13,29 +12,18 @@

/**
* RssFeedItem을 Post 엔티티로 변환하는 Processor
* 중복 체크도 여기서 수행하여 이미 존재하는 URL은 null 반환
*/
@Slf4j
@Component
@StepScope
@RequiredArgsConstructor
public class RssToPostProcessor implements ItemProcessor<RssFeedItem, Post> {

private final PostRepository postRepository;
private final TechBlogRepository techBlogRepository;

@Override
public Post process(RssFeedItem item) {
// 중복 체크
if (postRepository.existsByUrl(item.url())) {
log.debug("중복 URL 스킵: {}", item.url());
return null; // null 반환 시 Writer에서 처리 안 함
}

TechBlog techBlog = techBlogRepository.findById(item.techBlogId())
.orElseThrow(() -> new IllegalStateException(
"TechBlog를 찾을 수 없습니다. ID: " + item.techBlogId()));

TechBlog techBlog = techBlogRepository.getReferenceById(item.techBlogId());
return Post.create(item, techBlog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ public Step fetchAndSaveRssStep() {
.reader(rssFeedReader)
.processor(rssToPostProcessor)
.writer(postBatchWriter)
// 병렬 처리: 5개 스레드로 동시에 RSS 수집
.taskExecutor(rssTaskExecutor())
.faultTolerant()
// 건너뛰기 정책: 최대 10개 아이템까지 건너뛰기 허용
.skipLimit(10)
Expand Down Expand Up @@ -118,19 +116,6 @@ public Step embedAndIndexStep() {
.build();
}

@Bean
public TaskExecutor rssTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(20);
executor.setThreadNamePrefix("rss-crawl-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}

@Bean
public TaskExecutor embeddingTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

/**
* RSS 크롤링 스케줄러
* - 1시간마다 RSS 피드 크롤링 실행
* - 24시간마다 RSS 피드 크롤링 실행
* - Redis 분산 락으로 중복 실행 방지
*/
@Slf4j
Expand All @@ -31,7 +31,7 @@ public class RssCrawlingScheduler {

/**
* 매일 오전 5시마다 RSS 크롤링 실행
* cron: 0 0 5 * * * -> 매 시간 정각
* cron: 0 0 5 * * * -> 매일 오전 5시
*/
@Scheduled(cron = "0 0 5 * * *")
public void scheduleCrawling() {
Expand All @@ -50,17 +50,11 @@ public void scheduleCrawling() {
log.error("Unexpected error during scheduled crawling", e);
} finally {
distributedLock.unlock(CRAWLING_LOCK_KEY, lockValue);
cleanupStaleHistories();
}
}

/**
* 5분마다 오래된 RUNNING 상태의 이력을 정리 (좀비 프로세스 방지)
* cron: 매 5분마다 실행
*/
@Scheduled(cron = "0 */5 * * * *")
public void cleanupStaleHistories() {
log.debug("Checking for stale crawling histories");

private void cleanupStaleHistories() {
var staleHistories = crawlingHistoryRepository.findByStatusAndStartedAtBefore(
ECrawlingStatus.RUNNING, java.time.LocalDateTime.now().minusHours(1)
);
Expand Down
49 changes: 16 additions & 33 deletions src/main/java/com/techfork/global/config/WebClientConfig.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package com.techfork.global.config;

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.channel.ChannelOption;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;

import javax.net.ssl.SSLException;
import java.time.Duration;


Expand All @@ -20,34 +17,20 @@ public class WebClientConfig {

@Bean
public WebClient webClient() {
try {
// SSL Context 생성 - 모든 인증서 신뢰
SslContext sslContext = SslContextBuilder
.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();

// HttpClient 설정 (Netty 기반)
HttpClient httpClient = HttpClient.create()
.secure(sslContextSpec -> sslContextSpec.sslContext(sslContext))
.responseTimeout(Duration.ofSeconds(30))
.followRedirect(true); // Redirect 자동 추적

// WebClient 생성
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.defaultHeader("User-Agent", "Mozilla/5.0 (compatible; TechFork-Bot/1.0)")
.defaultHeader("Accept", "application/rss+xml, application/xml, application/atom+xml, text/xml, */*")
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(10 * 1024 * 1024))
.build();

return webClient;

} catch (SSLException e) {
log.error("WebClient 초기화 실패", e);
throw new RuntimeException("WebClient 초기화 실패", e);
}
// HttpClient 설정 (Netty 기반)
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) // 연결 타임아웃
.responseTimeout(Duration.ofSeconds(30)) // 응답 타임아웃
.followRedirect(true); // Redirect 자동 추적

// WebClient 생성
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.defaultHeader("User-Agent", "Mozilla/5.0 (compatible; TechFork-Bot/1.0)") // 봇 차단 방지
.defaultHeader("Accept", "application/rss+xml, application/xml, application/atom+xml, text/xml, */*") // RSS/XML 콘텐츠 명시
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(10 * 1024 * 1024)) // 큰 RSS 피드 처리 가능
.build();
}
}
Loading