As arquiteturas de dados modernas utilizam soluções de captura, transformação, movimentação e carregamento de dados em tempo real para construir data lakes, data warehouses analíticos e repositórios de big data. Isso permite a análise de dados de diversas fontes sem impactar as operações que os utilizam. Para alcançar esse objetivo, é essencial estabelecer um fluxo de dados contínuo, escalável, elástico e robusto. O método mais comum para isso é a técnica de CDC (Change Data Capture). O CDC monitora a produção de pequenos conjuntos de dados, captura esses dados automaticamente e os entrega a um ou mais destinatários, incluindo repositórios de dados analíticos. O principal benefício é a eliminação do atraso D+1 na análise, já que os dados são detectados na origem assim que são produzidos e, posteriormente, replicados para o destino.
Este artigo demonstrará as duas fontes de dados mais comuns para cenários de CDC, tanto como origem quanto como destino. Para a origem dos dados, exploraremos o CDC em bancos de dados SQL e arquivos CSV. Para o destino dos dados, utilizaremos um banco de dados colunar (um cenário típico de banco de dados analítico de alto desempenho) e um tópico do Kafka (uma abordagem padrão para transmitir dados para a nuvem e/ou para vários consumidores de dados em tempo real).
Visão Breve
Este artigo fornecerá um exemplo para o seguinte cenário de interoperabilidade:
- O SQLCDCAdapter utilizará o SQLInboundAdapter para detectar novos registros no banco de dados SQL e extraí-los com a ajuda de uma conexão JDBC e da linguagem SQL.
- O SQLCDCAdapter encapsulará os dados capturados em uma mensagem e a enviará para o CDCProcess (um Processo de Negócio usando a notação BPL).
- O CDCProcess recebe os dados SQL como uma mensagem e utiliza a Operação SQL para persistir os dados no IRIS e a Operação Kafka para transmitir os dados capturados para um tópico do Kafka.
- A Operação SQL persistirá os dados da mensagem em uma Classe Persistente do InterSystems IRIS modelada como armazenamento colunar.
- O armazenamento colunar é uma opção que oferece desempenho superior em consultas para dados analíticos.
- A Operação Kafka transformará a mensagem em JSON e a enviará para um tópico do Kafka, onde um data lake na nuvem ou qualquer outro assinante poderá consumi-la.
- Esses fluxos de dados são executados em tempo real, estabelecendo um fluxo de dados contínuo.
- O Serviço BAM calculará as métricas de negócios a partir da tabela colunar em tempo real. Um painel de BI exibirá as métricas de negócios resultantes para o usuário instantaneamente.
Instalando o exemplo
O pacote iris-cdc-sample (https://openexchange.intersystems.com/package/iris-cdc-sample) é um aplicativo de exemplo que implementa o cenário descrito acima. Para instalá-lo, siga estas etapas:
1. Clone/git pull o repositório em qualquer diretório local:
$ git clone https://github.com/yurimarx/iris-cdc-sample.git
2. Abra o terminal neste diretório e execute o comando abaixo:
$ docker-compose build
3. Execute o contêiner IRIS com seu projeto:
$ docker-compose up -d
Componentes do exemplo
Este exemplo utiliza os seguintes contêineres:
- iris: Plataforma InterSystems IRIS, incluindo:
- Banco de Dados Colunar IRIS (para armazenar os dados capturados).
- Interoperabilidade IRIS com um ambiente de produção para executar o processo de CDC (captura de dados de alteração). A produção captura dados de um banco de dados externo (PostgreSQL), os persiste no IRIS e os transmite para um tópico do Kafka.
- IRIS BAM (Monitoramento de Atividade Comercial) para calcular métricas de vendas em tempo real por produto e exibi-las em um painel.
- salesdb: Um banco de dados PostgreSQL contendo dados de vendas a serem capturados em tempo real.
- zookeeper: Um serviço usado para gerenciar o broker Kafka.
- kafka: O broker Kafka com o tópico de vendas, utilizado para receber e distribuir dados de vendas como eventos em tempo real.
- kafka-ui: Uma interface web do Kafka para administração e operação de tópicos e eventos.
services:
iris:
build:
context: .
dockerfile: Dockerfile
restart: always
command: --check-caps false --ISCAgent false
ports:
- 1972
- 52795:52773
- 53773
volumes:
- ./:/home/irisowner/dev/
networks:
- cdc-network
salesdb:
image: postgres:14-alpine
container_name: sales_db
restart: always
environment:
POSTGRES_USER: sales_user
POSTGRES_PASSWORD: welcome1
POSTGRES_DB: sales_db
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
- postgres_data:/var/lib/postgresql/data/
ports:
- "5433:5432"
networks:
- cdc-network
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
hostname: zookeeper
networks:
- cdc-network
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka
hostname: kafka
networks:
- cdc-network
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
hostname: kafka-ui
networks:
- cdc-network
ports:
- "8080:8080"
depends_on:
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local_kafka_cluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
volumes:
postgres_data:
driver: local
networks:
cdc-network:
driver: bridge
Criando uma tabela colunar
Tabelas colunares são utilizadas para armazenar dados não normalizados, como os seguintes:
|
Nome do produto |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Como os valores de Nome do Produto e Nome da Loja se repetem com frequência, armazenar os dados em formato colunar (como colunas em vez de linhas) conserva espaço de armazenamento e proporciona um desempenho superior na recuperação de dados. Historicamente, esse tipo de processamento exigia a criação de cubos de BI. No entanto, o armazenamento colunar resolve esse problema, eliminando a necessidade de replicar dados operacionais em cubos.
Agora, siga estas etapas para criar a tabela colunar de Vendas para o nosso exemplo:
1. Crie uma nova classe ObjectScript chamada Sales dentro do pacote dc.cdc.
2. Escreva o seguinte código-fonte:
Class dc.cdc.Sales Extends %Persistent [ DdlAllowed, Final ]
{
Parameter STORAGEDEFAULT = "columnar";
Parameter USEEXTENTSET = 1;
Property ProductName As %String;
Property StoreName As %String;
Property SalesValue As %Double;
}
3. O parâmetro STORAGEDEFAULT = "columnar" configura a tabela dc_cdc.Sales para usar columnar storage, ao invés de usar linha.
Criando a operação comercial para salvar os dados capturados
Após capturar os dados de vendas em um StreamContainer usando o SalesSqlService (nenhuma implementação é necessária; a configuração é feita no ambiente de produção na seção "Executando CDC"), precisamos de uma Operação de Negócios para processar o StreamContainer, extrair os dados de vendas do PostgreSQL e salvá-los na tabela Sales. Execute os passos abaixo:
1. Crie a classe SalesOperation dentro do pacote dc.cdc.
2. Escreva o código-fonte abaixo:
Class dc.cdc.SalesOperation Extends Ens.BusinessOperation
{
Method ProcessSalesData(pRequest As Ens.StreamContainer, Output pResponse As Ens.StringResponse) As %Status
{
Set tSC = $$$OK
Set pResponse = ##class(Ens.StringResponse).%New()
Try {
Set tStream = pRequest.Stream
Do tStream.Rewind()
Set content = ""
While 'tStream.AtEnd {
Set content = content _ tStream.Read(4096)
}
Set tDynamicObject = {}.%FromJSON(content)
Set sales = ##class(dc.cdc.Sales).%New()
Set sales.ProductName = tDynamicObject."product_name"
Set sales.StoreName = tDynamicObject."store_name"
Set sales.SalesValue = tDynamicObject."sales_value"
Set tSC = sales.%Save()
Set pResponse.StringValue = tDynamicObject.%ToJSON()
} Catch (ex) {
Set tSC = ex.AsStatus()
Set pResponse.StringValue = "Error while saving sales data!"
$$$LOGERROR("Error while saving sales data: " _ ex.DisplayString())
}
Quit tSC
}
XData MessageMap
{
<MapItems>
<MapItem MessageType="Ens.StreamContainer">
<Method>ProcessSalesData</Method>
</MapItem>
</MapItems>
}
}
3. O método ProcessSalesData receberá mensagens do tipo StreamContainer (devido à definição de MessageMap).
4. O método lerá os dados de vendas capturados em uma string JSON, carregará o JSON em um DynamicObject, criará um objeto Sales, definirá os valores de suas propriedades e o salvará na tabela Sales.
5. Finalmente, o método retornará a string JSON representando os dados de vendas na resposta.
Criando o serviço do BAM para monitorar vendas (Sales)
O InterSystems IRIS para Interoperabilidade inclui a funcionalidade BAM, permitindo monitorar dados de negócios em tempo real processados em produção usando um Painel de Análise. Para criar o serviço BAM, siga os passos abaixo:
1. Crie uma nova classe chamada SalesMetric que estenda Ens.BusinessMetric no pacote dc.cdc.
2. Escreva o seguinte código-fonte:
Class dc.cdc.SalesMetric Extends Ens.BusinessMetric
{
Property TotalSales As Ens.DataType.Metric(UNITS = "$US") [ MultiDimensional ];
Query MetricInstances() As %SQLQuery
{
SELECT distinct(ProductName) FROM dc_cdc.Sales
}
/// Calculate and update the set of metrics for this class
Method OnCalculateMetrics() As %Status
{
Set product = ..%Instance
Set SalesSum = 0.0
&sql(
select sum(SalesValue) into :SalesSum from dc_cdc.Sales where ProductName = :product
)
Set ..TotalSales = SalesSum
Quit $$$OK
}
}
3. A propriedade TotalSales permite o monitoramento em tempo real da soma das vendas por produto.
4. A propriedade Query MetricInstances define quais produtos devem ser monitorados.
5. O método OnCalculateMetrics calcula a soma das vendas para cada produto.
6. Esta classe será utilizada em um painel para gerar o total de vendas por produto em tempo real.
Executando o Processo e Produção do CDC - Change Data Capture
Nosso diagrama de produção final, com todos os processos ETL (Extração, Transformação e Carga) necessários, é mostrado abaixo:
Siga os próximos passos:
1. Acesse o ambiente de produção do CDC: http://localhost:52795/csp/user/EnsPortal.ProductionConfig.zen?PRODUCTIO...
2. Crie um novo EnsLib.JavaGateway.Service chamado Java (necessário para o SalesSqlService).
3. Gere um serviço de negócios chamado SalesSqlService (SQLCDCService) e configure os seguintes parâmetros:
a. DSN (string de conexão para PostgreSQL): jdbc:postgresql://sales_db:5432/sales_db.
b. Credenciais: Crie um pg_cred com o nome de usuário (sales_user) e a senha (welcome1) para acessar o PostgreSQL.
c. Nomes de configuração de destino: SalesProcess (o processo do CDC).
d. Consulta (para selecionar os dados a serem consumidos): select * from sales.
e. Nome do Campo Chave (a coluna que o IRIS usa para rastrear linhas já capturadas): id.
f. Serviço Java Gateway (obrigatório porque o adaptador CDC usa JDBC): Java (Java Gateway para esta produção).
g. Driver JDBC: org.postgresql.Driver.
h. Classpath JDBC (um driver para conectar com o PostgreSQL, copiado via script Dockerfile): /home/irisowner/dev/postgresql-42.7.8.jar.
4. Crie uma nova instância de dc.cdc.SalesMetric chamada SalesMetric.
5. Gere uma nova instância de EnsLib.Kafka.Operation e nomeie-a como SalesKafkaOperation (Operação Kafka) com os seguintes parâmetros:
a. ClientID: iris
b. Servers: kafka:9092
6. Crie uma nova instância de dc.cdc.SalesOperation chamada SalesOperation.
7. Desenvolva um Processo de Negócio chamado SalesProcess. A lógica de implementação do BPL deve ser a seguinte:
a. Diagrama final:
b. Crie 2 propriedades no Context:
i. `Sales` com o tipo `Ens.StringResponse` para armazenar os dados de vendas como uma string JSON.
ii. `KafkaMessage` com o tipo `EnsLib.Kafka.Message` (para ser usado para enviar os dados capturados para o tópico `sales-topic` do Kafka).
c. Gere uma chamada, salve na tabela `Sales` e defina o seguinte:
i. Destino: `SalesOperation`
ii. Classe da mensagem de solicitação: `Ens.StreamContainer` (dados capturados como fluxo)
iii. Ações da solicitação:
iv. Classe da mensagem de resposta: Ens.StringResponse (o fluxo será convertido em uma representação em string JSON dos dados capturados)
v. Ações de resposta:
d. Crie um bloco de código e escreva um código ObjectScript que preencha a mensagem do Kafka com as propriedades necessárias para que os dados de vendas (como uma string JSON) sejam publicados como um evento no tópico de vendas do broker do Kafka:
Set context.KafkaMessage.topic = "sales-topic"
Set context.KafkaMessage.value = context.Sales.StringValue
Set context.KafkaMessage.key = "iris"
e. Crie uma chamada para o tópico de vendas do Kafka e defina o seguinte design:
i. Destino: SalesKafkaOperation
ii. Classe da mensagem de solicitação: %Library.Persistent (KafkaMessage é persistente)
iii. Ações da solicitação:
f. Crie uma atribuição chamada "Enviar Resposta" com o seguinte conteúdo:
i. Propriedade: response.StringValue
ii. Valor: "Processo finalizado!"
Vendo os resultados do CDC
Após habilitar o CDCProduction, insira alguns registros na tabela de vendas do PostgreSQL usando sua ferramenta de administração de banco de dados (DBeaver ou PgAdmin) e observe os resultados das mensagens de produção.
Consulte o diagrama de sequência para entender o processo do CDC (clique em qualquer link do cabeçalho da mensagem):
Visualizando o monitoramento do BAM em um painel de análise
Ao capturar dados em tempo real, você naturalmente deseja visualizar os resultados instantaneamente em um painel. Siga os passos abaixo para conseguir isso:
1. Acesse Analytics > Portal do Usuário (User Portal):
2. Clique Add Dashboard:
3. Defina as propriedades abaixo e clique em OK:
a. Pasta: Ens/Analytics
b. Nome do painel: Sales BAM
4. Clique em Widgets:
5. Clique o botão mais:
6. Configure the Widget as shown below:
7. Ajuste o novo widget para cobrir toda a área do painel.

8. Agora, selecione WidgetSales:
9. Escolha Controls:
10. Clique no botão mais:
11. Configure o controle conforme ilustrado abaixo (para visualizar o total de vendas em tempo real, com atualização automática):

12. Agora, quando novos valores forem capturados, o painel exibirá imediatamente os valores atualizados para TotalSales.
Para aprender mais:
A documentação da InterSystems pode ajudá-lo a aprofundar seus conhecimentos em CDC, BAM, Kafka e interoperabilidade. Visite as páginas abaixo para saber mais:
- BAM: https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=EGIN_options#EGIN_options_bam
- Kafka: https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=ITECHREF_kafka
- Adaptadores SQL (CDC para tabelas SQL): https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=ESQL_intro
- Criando produções de ETL/CDC: https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=PAGE_interop_languages
- BPL (low code para processos de negócio): https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=EBPL_use
.png)
.png)
.png)
.png)
.png)
.png)
.png)
.png)
.png)
.png)
.png)