当前位置: 首页 > 图灵资讯 > 技术篇> Apache Camel 与 Quarkus 实用指南:构建 ETL 应用程序

Apache Camel 与 Quarkus 实用指南:构建 ETL 应用程序

来源:图灵教育
时间:2024-09-09 09:55:01

很高兴向大家介绍一系列相关的相关系列。 apache camel 文章。在第一篇文章中,我将介绍一个实用例来展示它的功能,而不是深入研究 apache camel 复杂性。具体来说,你将学习如何使用它 apache camel 在两个数据库之间创建简单的提取、转换和加载 (etl) 应用程序。

apache camel 简介 - 简要概述

在深入实际用例之前,我们先简单介绍一下 apache camel。 apache camel 它是一种利用企业集成模式的开源集成框架(eip)促进各种系统的集成。

在当今世界,许多不同类型的系统并存。有些可能是遗留系统,而另一些可能是新系统。由于不同的实现和新闻格式,这些系统通常需要相互交互和集成。一个解决方案是编写自定义代码来弥合这些差异,但这可能会导致紧密耦合和维护困难。

相反,apache camel 为调解系统之间的差异提供了额外的层,从而实现松散耦合,更容易维护。 camel 使用 api(或声明性 java 基于域特定语言的配置 eip 路由和中介规则。

企业集成模式 (eip)

要了解 apache camel,掌握“企业集成模式”(eip) 很重要。 《企业集成模式》一书描述了一组基于组件的大型系统设计模式,其中组件可以在同一过程或不同的机器上运行。关键思想是系统应该面向新闻,组件应该通过新闻通信。这些模式提供了用于实现这些通信的工具包(图) 1)。

Apache Camel 与 Quarkus 实用指南:构建 ETL 应用程序

图 1 – 集成解决方案的基本要素 (enterpriseintegrationpatterns.com)

apache camel 关键术语
  • 端点:端点是发送和接收信息的通道。它作为组件和外部世界之间的接口。

  • 消息:消息是由消息头和消息体组成的系统间通信的数据结构。标头包含元数据,文本包含实际数据。

  • 通道:通道连接两个端点,方便发送和接收消息。

  • 路由器:路由器将消息从一个端点定向另一个端点,以确定消息路径。

  • 翻译器:翻译器将信息从一种格式转换为另一种格式。

关于 apache camel 这就是我考虑的介绍。现在让我们向您展示如何使用它。 apache camel 在两个数据库之间创建一个简单的 etl 应用程序。

问题陈述

假设我们有一个高负载系统,其中一个关键组件是数据库。在某些情况下,我们需要在正常的操作案例之外处理这些数据 - 训练 ml 我们只需要数据的某些部分,模型,生成导出,图表。当然,这将给我们的操作数据库带来更大的负担。因此,最好有一种机制,我们可以提取必要的数据,将其转换为我们需要的形式,并将其存储在另一个数据库中 - 其他比操作性更好的。通过这一策略,我们解决了运营基地潜在超载的问题。而且,通过这种机制,我们可以在系统负载不太大的时候(比如晚上)进行这个操作。

解决方案概述

解决方案如下图所示(图2)。我们将使用 apache camel 在两个数据库之间创建简单的数据库 etl 应用程序。该应用程序将数据从源数据库中提取并转换,然后加载到目标数据库中。我们可以引入不同的策略来实现这个解决方案,重点关注如何从源数据库中提取数据。我假设基于记录的修改日期将选择数据的标准。该策略还提供了提取已修改数据的机会。

Apache Camel 与 Quarkus 实用指南:构建 ETL 应用程序

图 2 – 使用 apache camel 两个数据库之间的同步数据

源数据库和目标数据库将具有以下表结构:

create table if not exists user
(
    id            serial primary key,
    username      varchar(50)  not null,
    password      varchar(50)  not null,
    email         varchar(255) not null,
    created_at    timestamp default now()::timestamp without time zone,
    last_modified timestamp default now()::timestamp without time zone
);

在目标数据库中,我们将在插入之前将用户名转换为大写。

我们将为各种各样的事情做出贡献 camel 组件使用 camel quarkus 扩展。具体来说,我们将使用它 camel sql 组件与数据库交互。 sql组件支持sql查询、插入、更新和删除。

第一,创建一个扩展 routebuilder 并重写配置方法:

@applicationscoped
public class userroute extends routebuilder {
    @override
    public void configure() throws exception {
        // your code here
    }
}

这里不强制使用 @applicationscoped 但我更愿意表明这种类型是一种注释 cdi bean,并且应该由 cdi 容器管理。

正如我上面提到的,我们将使用它 camel sql 组件与数据库交互。我们需要配置camel 连接源数据库和目标数据库的sql组件。我们将使用 quarkus agroal 扩展配置数据源。 agroal 扩展为数据源提供了连接池。我们将在 application.properties 数据源配置在文件中。

#
# source database configuration
quarkus.datasource.source_db.db-kind=postgresql
quarkus.datasource.source_db.jdbc.url=jdbc:postgresql://localhost:5001/demo
quarkus.datasource.source_db.username=test
quarkus.datasource.source_db.password=password1
#
#
# target database configuration
quarkus.datasource.target_db.db-kind=postgresql
quarkus.datasource.target_db.jdbc.url=jdbc:postgresql://localhost:6001/demo
quarkus.datasource.target_db.username=test
quarkus.datasource.target_db.password=password1
#

现在我们可以配置camel 连接源数据库和目标数据库的sql组件。我们将使用它 sql 创建源数据库和目标数据库的组件 sql 端点。

sql 组件使用以下端点 uri 表示法:

sql:select * from table where id=# order by name[?options]

但是,我们需要机制来自动操作。我们每秒触发一次计时器组件 etl 过程。计时器组件用于在计时器触发时产生信息交换。计时器组件使用以下端点 uri 表示法:

timer:name[?options]

在我们的路线中,我们使用以下配置:

 from("timer://usersync?delay={{etl.timer.delay}}&period={{etl.timer.period}}")

{{etl.timer.delay}} 和 {{etl.timer.period}} 是我们将在 application.properties 文件中定义的配置值。

etl.timer.period=10000
etl.timer.delay=1000

在将数据插入目标数据库之前,我们需要提供翻译:

.process(exchange -> {
    final map<string object> rows = exchange.getin().getbody(map.class);

    final string username = (string) rows.get("username");

    final string usernametouppercase = username.touppercase();

    log.info("user name: {} converted to upper case: {}", username, usernametouppercase);

    rows.put("username", usernametouppercase);
})

</string>

用于实现信息交换或实现信息转换器等用例的处理器接口。

看,我们用 apache camel 在两个数据库之间建立一个简单的数据库 etl 应用程序。

在操作应用程序时,应在日志中看到以下输出:

2024-06-09 13:15:49,257 INFO  [route1] (Camel (camel-1) thread #1 - timer://userSync) Extracting Max last_modified value from source database
2024-06-09 13:15:49,258 INFO  [route1] (Camel (camel-1) thread #1 - timer://userSync) No record found in target database
2024-06-09 13:15:49,258 INFO  [route2] (Camel (camel-1) thread #1 - timer://userSync) The last_modified from source DB: 
2024-06-09 13:15:49,274 INFO  [route2] (Camel (camel-1) thread #1 - timer://userSync) Extracting records from source database
2024-06-09 13:15:49,277 INFO  [org.iqn.cam.rou.UserRoute] (Camel (camel-1) thread #1 - timer://userSync) User name: john_doe converted to upper case: JOHN_DOE
2024-06-09 13:15:49,282 INFO  [org.iqn.cam.rou.UserRoute] (Camel (camel-1) thread #1 - timer://userSync) User name: jane_smith converted to upper case: JANE_SMITH
2024-06-09 13:15:49,283 INFO  [org.iqn.cam.rou.UserRoute] (Camel (camel-1) thread #1 - timer://userSync) User name: alice_miller converted to upper case: ALICE_MILLER

您可以在 github 在存储库中找到应用程序的完整源代码。

结论

通过这个设置,我们使用它 apache camel 创造了一个简单的 etl 该应用程序从源数据库中提取数据,转换数据,然后加载到目标数据库中。该方法有助于减少操作数据库的负载,并允许我们在非高峰时间提取数据。

以上是Apache。 Camel 与 Quarkus 实用指南:构造 ETL 详情请关注图灵教育的其他相关文章!