Esta abordagem permitirá maior flexibilidade e escalabilidade no pipeline ETL. Cada novo container será responsável por um padrão específico e poderá ser atualizado e escalado independentemente.
- Criar novos containers para cada padrão (Lookup, SKG, SKP).
- Configurar novos tópicos Kafka para cada estágio do pipeline, garantindo o fluxo correto dos dados.
- Implementar serviços independentes que consumam e produzam para os tópicos corretos.
- Adicionar um serviço central de logging e erro para monitoramento.
- Raw Data (CSV, JSON, etc.) (Kafka Source Connector) → Pre-Processing-Service → Lookup-Service → SKG-Service → SKP-Service → Transform-Service → Kafka Sink Connector → Load (MySQL)
- Input (dados crus): Dados_CSV_Topic
- Output do PreProcessing: Preprocessed_Data_Topic
- Output do Lookup: lookup-output
- Output do SKG: skg-output
- Output do SKP: skp-output
- Transform-Service recebe do SKP: skp-output
- Transform-Service publica os dados finais: Dados_CSV_Topic_Modified_{subject_name}
- Assim, o transform-service recebe os dados já enriquecidos com IDs substitutos e pode focar apenas na formatação e carregamento.
- Agora os serviços não precisam mais consumir os tópicos dinâmicos do transform-service, mas sim os dados brutos antes da transformação. Isso significa que removemos a configuração KAFKA_INPUT_TOPIC do docker-compose.yml e movemos a lógica de roteamento para os próprios serviços Python.
- Os novos serviços (Lookup, SKG, SKP) agora consomem dos tópicos anteriores ao Transform.
- O transform-service passa a consumir do skp-output.
- O docker-compose.yml não precisa mais definir os tópicos de input dos serviços dinamicamente.
🚀 Benefícios dessa mudança ✅ Evita transformar os dados antes do enriquecimento ✅ Facilita a gestão dos tópicos no Kafka (sem necessidade de consumir tópicos dinâmicos no lookup, SKG e SKP) ✅ Melhora a modularidade: cada serviço tem uma única responsabilidade clara ✅ Reduz complexidade no docker-compose.yml, já que os serviços gerenciam seus próprios tópicos
Ajustes para o fluxo correto:
- Atualizar dependências entre serviços para refletir a nova ordem do pipeline.
- Garantir que os tópicos Kafka sejam consumidos na ordem correta.
- Manter todos os componentes existentes, apenas ajustando o fluxo dos dados.
Se cada serviço (PRE-PROCESSING, LOOKUP, SKG, SKP, TRANSFORM, LOGGING) tiver um group.id único, cada um consumirá todas as mensagens do seu tópico de input. Ou seja:
- O PRE-PROCESSING consumirá todas as mensagens de Dados_CSV_Topic.
- O LOOKUP consumirá todas as mensagens de Preprocessed_Data_Topic.
- O SKG consumirá todas as mensagens de Lookup_Data_Topic.
- O SKP consumirá todas as mensagens de SKG_Data_Topic.
- O TRANSFORM consumirá todas as mensagens de SKP_Data_Topic.
- O LOGGING consumirá todas as mensagens dos tópicos que forem configurados.
- Garantia de que cada serviço processa todas as mensagens necessárias.
- Mantém a independência e modularidade entre os serviços.
- Facilita o escalonamento horizontal de cada serviço separadamente.
Quando se altera o Dockerfile de um serviço ou os conteúdos da sua diretoria de trabalho, é necessário utilizar a chamada seguinte:
docker compose build
Após isto, é possível relançar os serviços através do comando:
docker-compose up -d
-
Consumer Group aplica-se ao tópico de entrada e não ao de saída? Sim, o consumer group afeta apenas os tópicos de entrada de cada serviço. Aqui está um resumo do funcionamento:
-
Cada serviço que consome mensagens de um tópico Kafka pertence a um consumer group específico.
-
Todos os consumidores dentro do mesmo grupo compartilham a carga das mensagens do mesmo tópico de entrada.
-
O tópico de saída (onde um serviço produz mensagens) não é afetado pelo consumer group, porque os produtores não pertencem a consumer groups — eles simplesmente enviam mensagens para um tópico.
Exemplo deste sistema:
- O
preprocessing-serviceconsome do tópico de inputDados_CSV_Topice produz para o tópico de outputPreprocessed_Data_Topic. - O
lookup-servicedeveria consumir dePreprocessed_Data_Topice produzir paraLookup_results. - Se o lookup-service não estiver consumindo mensagens, pode ser um problema no consumer (por exemplo, um erro no consumo ou commit de offset).
{ "schema": "musicdata", "id_natural": "Oi Balde - Ao Vivo", "surrogate_key": 1, "data": { "track_name": "Oi Balde - Ao Vivo", "artists_name": "Zé Neto & Cristiano", "artist_count": "1" } }
{ "schema": "musicdata", "surrogate_key": 1, "data": { "track_name": 1, "artists_name": "Zé Neto & Cristiano", "artist_count": "1" } }
- Lê mensagens do Kafka no tópico SKG_Results
- Verifica se os campos esperados estão presentes (schema, id_natural, surrogate_key, data)
- Substitui o id_natural pela surrogate_key no campo "data"
- Remove o campo id_natural do output
- Publica a mensagem modificada no tópico SKP_Results
- Isso garante que qualquer campo em "data" que originalmente tivesse o id_natural seja atualizado para o SK.
- O Tópico de Logs recebe corretamente os warnings e infos do tópico de Lookup, mas não está a criar o ficheiro JSON de logs.
- O serviço de Lookup conecta-se à BD (aparentemente), mas apenas gera warnings e nada no topico de Output.
- Questionar ao Professor se o Padrão de Lookup devia criar uma entrada na tabela de Lookup quando não existe a entrada ou se apenas no SKG é que se faz a geração dessas entradas? Acho que faz sentido não gerar no Lookup e gerar no SKG
- O serviço de Lookup, no fim, envia para o topico de Logging o log do seu resultado e envia o que obteve da tabela de lookup para o topico de Lookup_results. Faz sentido? Será que devia acrescentar algum campo ao schema? em vez disso? Ou mando esse resultado pro topico e na proxima etapa uso o mesmo topico de novo.
- A tabela de Lookup n está a ir corretamente, mas a de SK está, se calhar apago só a de lookup e uso sempre a mesma?
- Testar se o transform-service no projeto antigo (GenericETL), funciona com o Conector de Sink se mudar o .env de localhost para mysql (No transform-service n se usava o .env para a BD, apenas para o Kafka. O Conector Sink tem o endereço correto)