Apache Camel 与 Quarkus 实用指南:构建 ETL 应用程序
2024-09-09 09:55:01
很高兴向大家介绍一系列相关的相关系列。 apache camel 文章。在第一篇文章中,我将介绍一个实用例来展示它的功能,而不是深入研究 apache camel 复杂性。具体来说,你将学习如何使用它 apache camel 在两个数据库之间创建简单的提取、转换和加载 (etl) 应用程序。
apache camel 简介 - 简要概述在深入实际用例之前,我们先简单介绍一下 apache camel。 apache camel 它是一种利用企业集成模式的开源集成框架(eip)促进各种系统的集成。
在当今世界,许多不同类型的系统并存。有些可能是遗留系统,而另一些可能是新系统。由于不同的实现和新闻格式,这些系统通常需要相互交互和集成。一个解决方案是编写自定义代码来弥合这些差异,但这可能会导致紧密耦合和维护困难。
相反,apache camel 为调解系统之间的差异提供了额外的层,从而实现松散耦合,更容易维护。 camel 使用 api(或声明性 java 基于域特定语言的配置 eip 路由和中介规则。
企业集成模式 (eip)要了解 apache camel,掌握“企业集成模式”(eip) 很重要。 《企业集成模式》一书描述了一组基于组件的大型系统设计模式,其中组件可以在同一过程或不同的机器上运行。关键思想是系统应该面向新闻,组件应该通过新闻通信。这些模式提供了用于实现这些通信的工具包(图) 1)。
图 1 – 集成解决方案的基本要素 (enterpriseintegrationpatterns.com)
apache camel 关键术语端点:端点是发送和接收信息的通道。它作为组件和外部世界之间的接口。
消息:消息是由消息头和消息体组成的系统间通信的数据结构。标头包含元数据,文本包含实际数据。
通道:通道连接两个端点,方便发送和接收消息。
路由器:路由器将消息从一个端点定向另一个端点,以确定消息路径。
翻译器:翻译器将信息从一种格式转换为另一种格式。
关于 apache camel 这就是我考虑的介绍。现在让我们向您展示如何使用它。 apache camel 在两个数据库之间创建一个简单的 etl 应用程序。
问题陈述假设我们有一个高负载系统,其中一个关键组件是数据库。在某些情况下,我们需要在正常的操作案例之外处理这些数据 - 训练 ml 我们只需要数据的某些部分,模型,生成导出,图表。当然,这将给我们的操作数据库带来更大的负担。因此,最好有一种机制,我们可以提取必要的数据,将其转换为我们需要的形式,并将其存储在另一个数据库中 - 其他比操作性更好的。通过这一策略,我们解决了运营基地潜在超载的问题。而且,通过这种机制,我们可以在系统负载不太大的时候(比如晚上)进行这个操作。
解决方案概述解决方案如下图所示(图2)。我们将使用 apache camel 在两个数据库之间创建简单的数据库 etl 应用程序。该应用程序将数据从源数据库中提取并转换,然后加载到目标数据库中。我们可以引入不同的策略来实现这个解决方案,重点关注如何从源数据库中提取数据。我假设基于记录的修改日期将选择数据的标准。该策略还提供了提取已修改数据的机会。
图 2 – 使用 apache camel 两个数据库之间的同步数据
源数据库和目标数据库将具有以下表结构:
create table if not exists user ( id serial primary key, username varchar(50) not null, password varchar(50) not null, email varchar(255) not null, created_at timestamp default now()::timestamp without time zone, last_modified timestamp default now()::timestamp without time zone );
在目标数据库中,我们将在插入之前将用户名转换为大写。
我们将为各种各样的事情做出贡献 camel 组件使用 camel quarkus 扩展。具体来说,我们将使用它 camel sql 组件与数据库交互。 sql组件支持sql查询、插入、更新和删除。
第一,创建一个扩展 routebuilder 并重写配置方法:
@applicationscoped public class userroute extends routebuilder { @override public void configure() throws exception { // your code here } }
这里不强制使用 @applicationscoped 但我更愿意表明这种类型是一种注释 cdi bean,并且应该由 cdi 容器管理。
正如我上面提到的,我们将使用它 camel sql 组件与数据库交互。我们需要配置camel 连接源数据库和目标数据库的sql组件。我们将使用 quarkus agroal 扩展配置数据源。 agroal 扩展为数据源提供了连接池。我们将在 application.properties 数据源配置在文件中。
# # source database configuration quarkus.datasource.source_db.db-kind=postgresql quarkus.datasource.source_db.jdbc.url=jdbc:postgresql://localhost:5001/demo quarkus.datasource.source_db.username=test quarkus.datasource.source_db.password=password1 # # # target database configuration quarkus.datasource.target_db.db-kind=postgresql quarkus.datasource.target_db.jdbc.url=jdbc:postgresql://localhost:6001/demo quarkus.datasource.target_db.username=test quarkus.datasource.target_db.password=password1 #
现在我们可以配置camel 连接源数据库和目标数据库的sql组件。我们将使用它 sql 创建源数据库和目标数据库的组件 sql 端点。
sql 组件使用以下端点 uri 表示法:
sql:select * from table where id=# order by name[?options]
但是,我们需要机制来自动操作。我们每秒触发一次计时器组件 etl 过程。计时器组件用于在计时器触发时产生信息交换。计时器组件使用以下端点 uri 表示法:
timer:name[?options]
在我们的路线中,我们使用以下配置:
from("timer://usersync?delay={{etl.timer.delay}}&period={{etl.timer.period}}")
{{etl.timer.delay}} 和 {{etl.timer.period}} 是我们将在 application.properties 文件中定义的配置值。
etl.timer.period=10000 etl.timer.delay=1000
在将数据插入目标数据库之前,我们需要提供翻译:
.process(exchange -> { final map<string object> rows = exchange.getin().getbody(map.class); final string username = (string) rows.get("username"); final string usernametouppercase = username.touppercase(); log.info("user name: {} converted to upper case: {}", username, usernametouppercase); rows.put("username", usernametouppercase); }) </string>
用于实现信息交换或实现信息转换器等用例的处理器接口。
看,我们用 apache camel 在两个数据库之间建立一个简单的数据库 etl 应用程序。
在操作应用程序时,应在日志中看到以下输出:
2024-06-09 13:15:49,257 INFO [route1] (Camel (camel-1) thread #1 - timer://userSync) Extracting Max last_modified value from source database 2024-06-09 13:15:49,258 INFO [route1] (Camel (camel-1) thread #1 - timer://userSync) No record found in target database 2024-06-09 13:15:49,258 INFO [route2] (Camel (camel-1) thread #1 - timer://userSync) The last_modified from source DB: 2024-06-09 13:15:49,274 INFO [route2] (Camel (camel-1) thread #1 - timer://userSync) Extracting records from source database 2024-06-09 13:15:49,277 INFO [org.iqn.cam.rou.UserRoute] (Camel (camel-1) thread #1 - timer://userSync) User name: john_doe converted to upper case: JOHN_DOE 2024-06-09 13:15:49,282 INFO [org.iqn.cam.rou.UserRoute] (Camel (camel-1) thread #1 - timer://userSync) User name: jane_smith converted to upper case: JANE_SMITH 2024-06-09 13:15:49,283 INFO [org.iqn.cam.rou.UserRoute] (Camel (camel-1) thread #1 - timer://userSync) User name: alice_miller converted to upper case: ALICE_MILLER
您可以在 github 在存储库中找到应用程序的完整源代码。
结论通过这个设置,我们使用它 apache camel 创造了一个简单的 etl 该应用程序从源数据库中提取数据,转换数据,然后加载到目标数据库中。该方法有助于减少操作数据库的负载,并允许我们在非高峰时间提取数据。
以上是Apache。 Camel 与 Quarkus 实用指南:构造 ETL 详情请关注图灵教育的其他相关文章!