キーポイント
- Apache Kafka (or Kafka) is a distributed event store and stream-processing platform for storing, consuming, and processing data streams.
- Quarkus integrates perfectly with reactive applications that use Apache Kafka for event processing.
- Quarkus Dev Services provides you with an automatic way to boot up a Kafka cluster to test your application.
- Apache Kafka can replace messaging systems like JMS, especially when you need real-time stream processing.
なぜApache Kafkaですか? それは何ですか?
今日のデータの処理/消費方法は、これまでの方法とは異なります。以前は、データはデータベースに保存され、分析を行うためにバッチ処理されていました。
このアプローチは正しいですが、モダンなプラットフォームでは、データがシステムに到来したときにリアルタイムでデータを処理できます。
Apache Kafka (Kafka) は、データストリーム処理で保存、消費、および処理するための分散イベントストアおよびストリーム処理プラットフォームです。
Kafkaには、その仕組みを理解するための5つの重要な概念があります:
- イベント (またはメッセージ): イベントは、処理されるシステムに保存されているデータを表すタイムスタンプ付きのキーと値のペアです。Kafkaの視点では、これは単なるバイトのかたまり (chunk) です。
- パーティション (Partition): パーティションは、イベントが生成および消費される場所です。パーティションでは、イベントの順序が保証されます。
- トピック (Topic): トピックは、1つ以上のパーティションで構成されます。トピックは、開発者がイベントを消費または生成するために使用する作業の単位です。
- コンシューマ (Consumer): コンシューマはトピックをサブスクライブし、イベントがトピックに発行されるたびに通知されます。
- プロデューサ (Producer): プロデューサは、イベントをトピック (実際はトピックに属したパーティションの1つ) に発行します。
Apache Kafkaの重要な側面の1つは、スケーラビリティとフォールトトレラントを念頭に置いて作られているため、ハイパフォーマンスアプリケーションに適していることです。Kafkaは、Java Message Service (JMS) やAdvanced Message Queuing Protocol (AMQP) などの従来のメッセージングシステムの代替と見なすことができます。
Apache Kafkaは、現在使用されているほとんどの言語と統合されていますが、この記事では、Javaとの統合、特にQuarkus Javaスタックについて説明します。
Quarkusとは何ですか?
Quarkusは、Java仮想マシン (JVM) およびネイティブコンパイル用に作成されたフルスタックのKubernetesネイティブJavaフレームワークです。Javaをコンテナ専用に最適化し、サーバレス、クラウド、およびKubernetes環境で効果的なプラットフォームにすることができます。
Quarkusは、車輪の再発明を行うのではなく、標準/仕様に裏打ちされたよく知られたエンタープライズグレードのフレームワークを使用し、GraalVMを使用してそれらをバイナリにコンパイルできるようにします。
QuarkusにKafkaを統合する方法
Quarkusは、SmallRye Reactive Messagingプロジェクトを使用してApache Kafkaと相互作用します。
Quarkus入門
Quarkusの使用を開始する最も簡単な方法は、スタートページで必要な依存関係を追加することです。サービスごとに依存関係が異なる場合があり、Java 11またはJava 17から選択できます。QuarkusとKafkaを統合するには、SmallRye Reactive Messaging - Kafka Connector 拡張を追加する必要があります。
開発中のアプリケーション
私たちは動画ストリーミング会社であり、1つのユースケースに動画の保存があるとします。これは、従来のデータベースを使用して実現できますが、優れたユーザーエクスペリエンスにはリアルタイムのやり取りが必要になるため、Kafkaに保存することにしました。
したがって、2つのサービスがあります。1つはユーザが動画の再生を停止するたびにイベントを生成し、もう1つはこれらのイベントを消費してサーバ側のイベントとして表示/ストリーミングします。
次の図は、アーキテクチャを示しています:
これらのサービスをQuarkusに実装して、内部の詳細についてもう少し説明しましょう。
動画再生プロデューサ
ユーザが動画を停止するたびに、このサービスはKafkaの PlaytimeMovies トピックにイベントを送信します。イベントには、動画IDと合計視聴時間が含まれます。デモの目的で、タイマーは、ユーザが動画を見たかどうかをシミュレートするロジックを自動的にトリガします。
サービスが開始されると、Kafkaの Movies トピックにいくつかの動画が生成されます。
プロジェクトの作成
Quarkusのスタートページに移動し、Kafkaと統合するために smallrye-reactive-messaging-kafka 拡張を選択します。次に、JSON-JavaからObject-Byte Arrayにイベントをマーシャリング/アンマーシャリングするために Jackson 拡張を選択します。また、Started Code 生成オプションのチェックも外します。
次のスクリーンショットのようになります:
この手動の手順をスキップして、すべてが選択されている Kafka Quarkus Generator リンクに移動するオプションがあります。次に、Generate your application ボタンを押して、足場が用意された (scaffolded) アプリケーションのzipファイルをダウンロードします。
ファイルをUnzip (解凍) し、好みのIDEでプロジェクトを開きます。
開発
では2つのPOJOを作成しましょう。1つは Movie
を表し、もう1つは PlayedMovie
を表します。
public class Movie {
public int id;
public String name;
public String director;
public String genre;
public Movie(int id, String name, String director, String genre) {
this.id = id;
this.name = name;
this.director = director;
this.genre = genre;
}
}
Movie には動画の id
、name
、director
と genre
が含まれます。
public int id;
public long duration;
public MoviePlayed(int id, long duration) {
this.id = id;
this.duration = duration;
}
}
MoviePlayed には、再生された動画の識別子 (id
) とユーザが動画を視聴した時間 (duration
) が含まれています。
Movie をKafkaトピックに保存し、再生された動画 (MoviePlayed) をシミュレートする責任を持つ MovieKafkaGenerator
という名前の新しいクラスが必要になります。
トピックにイベントの発行を開始するには、最初に2つのクラスが必要です。1つはアノテーション @Outgoing
で、Kafkaの Movies トピックを指すように構成されたチャネルの形式でイベントを送信する場所を指定するのに使用され、Record
クラスは、キーと値が存在するイベントのラッパーを表します。
次に、動画をKafkaの Movies トピックに生成する MovieKafkaGenerator
クラスを作成しましょう。
package org.acme.movieplays;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.Record;
@ApplicationScoped
public class MovieKafkaGenerator {
private List<Movie> movies = List.of(
new Movie(1, "The Hobbit", "Peter Jackson", "Fantasy"),
new Movie(2, "Star Trek: First Contact", "Jonathan Frakes", "Space"),
new Movie(3, "Encanto", "Jared Bush", "Animation"),
new Movie(4, "Cruella", "Craig Gillespie", "Crime Comedy"),
new Movie(5, "Sing 2", "Garth Jennings", "Jukebox Musical Comedy")
);
// Populates movies into Kafka topic
@Outgoing("movies")
public Multi<Record<Integer, Movie>> movies() {
return Multi.createFrom().items(movies.stream()
.map(m -> Record.of(m.id, m))
);
}
}
このクラスで次のように注意すべき重要なことがあります:
- このクラスは、CDIスコープのクラスです (
@ApplicationScoped
) - 簡単にする目的で、動画はリストで定義されています。
@Outgoing
アノテーションは、イベントの送信先 (動画チャネル) を設定するために使用されます。movies()
メソッドからの戻り値は、定義されたチャネルに自動的に送信されます。戻り型は、イベントの中身をラップするリアクティブ/非同期型 (たいていはQuarkusのio.smallrye.mutiny.Multi
) です。チャネルは後でトピックを指すように構成されます。Record
(イベント/メッセージ) には、キーとして動画IDがあり、値として Movie オブジェクトがあります。
最後のステップは、Kafkaインスタンスに接続するようにQuarkusパラメータを構成することです。Quarkusアプリケーションは、src/main/resources/
ディレクトリにある application.properties
ファイルで構成されます。
次の一般的なプロパティを使用して、チャネルとトピックの関係を簡単に構成できます:
mp.messaging.outgoing.<channel_name>.topic=<topic>
このサンプルアプリケーションでは、これは次のように定義されます:
mp.messaging.outgoing.movies.topic=movies
そうすると、Kafkaブローカの場所がどこで構成されているのか疑問に思われたかもしれません。QuarkusはKafka用に開発サービス (Dev Services) 機能を提供しているため、ローカル開発目的では必要ないのです。Dev Servicesは、Podmanやその他のOCI準拠ツールなどの動作中のコンテナランタイムで、必要な外部依存関係のインスタンス (つまり、データベースインスタンス、Kafkaブローカ、Keycloakサービスなど) をプロビジョニングします。開発者の視点から、拡張機能を含めて構成しない場合、Quarkusはサービスを自動的に起動し、それを使用するようにアプリケーションを構成します。
したがって開発サイクル中に他の構成パラメータは必要ありません。Quarkusが代わりに行ってくれます。
重要: このサンプルを実行するには、ローカルマシンでDockerホストを実行している必要があります。持っていない場合でKafkaブローカをデプロイしている場合は、「リモート」インスタンスをQuarkusアプリケーションに構成する方法について後で説明します。
Dockerホストを実行している状態で、ターミナルからQuarkus開発モードでアプリケーションを起動します:
./mvnw compile quarkus:dev
ターミナルには次のように出力されます:
[INFO] Scanning for projects...
[INFO]
[INFO] -------------------< org.acme:movie-plays-producer >--------------------
[INFO] Building movie-plays-producer 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
….
2022-03-21 11:37:24,474 INFO [io.qua.sma.dep.processor] (build-8) Configuring the channel 'movies' to be managed by the connector 'smallrye-kafka'
2022-03-21 11:37:24,483 INFO [io.qua.sma.rea.kaf.dep.SmallRyeReactiveMessagingKafkaProcessor] (build-30) Generating Jackson serializer for type org.acme.movieplays.Movie
--
--
Checking Docker Environment 2022-03-21 11:37:25,018 INFO [org.tes.uti.ImageNameSubstitut
--
2022-03-21 11:37:28,500 INFO [io.qua.kaf.cli.dep.DevServicesKafkaProcessor] (build-22) Dev Services for Kafka started. Other Quarkus applications in dev mode will find the broker automatically. For Quarkus applications in production mode, you can connect to this by starting your application with -Dkafka.bootstrap.servers=OUTSIDE://localhost:32769
2022-03-21 11:37:29,581 INFO [io.quarkus] (Quarkus Main Thread) movie-plays-producer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 6.666s.
2022-03-21 11:37:29,582 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-03-21 11:37:29,582 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
アプリケーションがコンパイルされ、Jacksonシリアライザ (最初に拡張を追加したことを思い出してください) を自動的に構成し、Movie
オブジェクトをバイト配列にシリアル化してKafkaトピックに保存します。次に、Kafkaブローカが localhost:32769
で自動的に起動し、アプリケーションはそれに接続するように自動的に構成されます。最後に、アプリケーションが起動し、すべての動画がKafkaの Movies トピックに挿入されます。
kcat ツールを使用して、トピックの内容を検査できます。ターミナルウィンドウで次のコマンドを実行し、Kafkaブローカのアドレスを自身のアドレスに置き換えます:
kcat -b localhost:32769 -t movies -C -K:
:{"id":1,"name":"The Hobbit","director":"Peter Jackson","genre":"Fantasy"}
:{"id":2,"name":"Star Trek: First Contact","director":"Jonathan Frakes","genre":"Space"}
:{"id":3,"name":"Encanto","director":"Jared Bush","genre":"Animation"}
:{"id":4,"name":"Cruella","director":"Craig Gillespie","genre":"Crime Comedy"}
:{"id":5,"name":"Sing 2","director":"Garth Jennings","genre":"Jukebox Musical Comedy"}
% Reached end of topic movies [0] at offset 5
アプリケーションを停止し、再生された動画を生成する部分を追加しましょう。
MovieKafkaGenerator
クラスを開いて次のコードを追加します:
private Random random = new Random();
@Inject
Logger logger;
@Outgoing("play-time-movies")
public Multi<Record<String, PlayedMovie>> generate() {
return Multi.createFrom().ticks().every(Duration.ofMillis(1000))
.onOverflow().drop()
.map(tick -> {
Movie movie = movies.get(random.nextInt(movies.size()));
int time = random.nextInt(300);
logger.info("movie {0} played for {1} minutes", movie.name, time);
// Region as key
return Record.of("eu", new PlayedMovie(movie.id, time));
});
}
新しいメソッドで注意すべき重要なことがいくつかあります:
- イベントは、play-time-movies チャネルに生成されます。
- 新しいイベントが毎秒トリガーされます。
- map メソッドで動画がランダムに選択され、ランダムに再生される時間が割り当てられます。
Record
(イベント/メッセージ) が作成されます。ここでは、キーはユーザの地域を表し、値はPlayedMovie
オブジェクトです。
最後に、application.properties
ファイルを開き、新しいチャネルを構成します:
mp.messaging.outgoing.play-time-movies.topic=playtimemovies
アプリケーションを再起動すると、アプリケーションは毎秒新しいイベントを生成します。
./mvnw compile quarkus:dev
2022-03-21 12:36:01,297 INFO [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18258: Kafka producer kafka-producer-play-time-movies, connected to Kafka brokers 'OUTSIDE://localhost:32771', is configured to write records to 'playtimemovies'
2022-03-21 12:36:01,835 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 148 minutes
2022-03-21 12:36:02,336 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 288 minutes
2022-03-21 12:36:02,836 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 176 minutes
新しいイベントが発行されるたびに、コンソールにログ行が表示されます。kcatツールを使用してトピックの内容を調べてみましょう。
kcat -b localhost:32773 -t playtimemovies -C -K:
eu:{"id":4,"duration":88}
eu:{"id":3,"duration":291}
eu:{"id":1,"duration":228}
eu:{"id":2,"duration":165}
eu:{"id":1,"duration":170}
eu:{"id":4,"duration":75}
動画再生コンシューマ
このサービスには、Kafkaトピックからのイベントを消費する責任があります。消費されたイベントは、HTTPサーバ側のイベントを使用して呼び出し元にストリーミングされます。イベントは、再生された動画IDと視聴された合計時間を含む再生された動画データです。
プロジェクトの作成
Quarkusのスタートページに移動し、resteasy-reactive-jackson を選択して、JSONからJavaオブジェクトにマーシャリング/アンマーシャリングするためのJacksonサポートとKafka統合のための smallrye-reactive-messaging-kafka 拡張を含むJAX-RSリアクティブエンドポイントを実装します。また、Sarted Code 生成オプションのチェックも外します。
繰り返しますが、この手動の手順をスキップして、すべてが選択されている次の Kafka Quarkus Generator リンクに移動するオプションがあります。次に、Generate your application ボタンを押して、足場が用意されたアプリケーションのzipファイルをダウンロードします。
ファイルをUnzip (解凍) し、好みのIDEでプロジェクトを開きます。
開発
このサービスは PlayedMovie
イベントを処理するので、この要素の単純なPOJOを作成しましょう:
public class PlayedMovie {
public int id;
public long duration;
public MoviePlayed(int id, long duration) {
this.id = id;
this.duration = duration;
}
}
次に、PlayedMovieResource という名前の新しいクラスを作成し、Kafkaトピックから消費されたイベントをストリーミングするJAX-RSリアクティブエンドポイントを記述します。
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.eclipse.microprofile.reactive.messaging.Channel;
import io.smallrye.mutiny.Multi;
@Path("/movies")
public class PlayedMovieResource {
@Channel("played-movies")
Multi<MoviePlayed> playedMovies;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<PlayedMovie> stream() {
return playedMovives;
}
}
この小さなクラスは多くのことを行います:
@Path
アノテーションを使用して、/movies
エンドポイントをHTTPエンドポイントに公開します。- playing-movies という名前のチャネルでイベントを処理します。新しいイベントがチャネルに送信されるたびに (つまり、イベントがKafkaトピックに発行されるたびに)、そのイベントは自動的に
Multi
インスタンスに発行されます。 - HTTP GETメソッドを使用して
/movies
エンドポイントを呼び出すと、アプリケーションはチャネルで受信したイベントのストリーミングを開始します。
最後に、application.properties
ファイルでチャネルを構成してチャネル (トピックとオフセット戦略) を構成し、リスニングポートを 9090 に変更して、ポート 8080 のプロデューササービスと衝突しないようにします。
mp.messaging.incoming.movies-played.topic=playtimemovies
mp.messaging.incoming.movies-played.auto.offset.reset=earliest
%dev.quarkus.http.port=9090
動画再生プロデューサ (movie-player-producer) サービスを1つの端末で稼働させてから、動画再生コンシューマ (movie-player-consumer) を開始しましょう。新しいターミナルウィンドウで、サービスを開発モードで実行します。
./mvnw compile quarkus:dev
[INFO] Scanning for projects...
[INFO]
[INFO] -------------------< org.acme:movie-plays-consumer >--------------------
[INFO] Building movie-plays-consumer 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
….
2022-03-21 17:59:08,079 INFO [io.qua.sma.dep.processor] (build-13) Configuring the channel 'movies-played' to be managed by the connector 'smallrye-kafka'
2022-03-21 17:59:08,092 INFO [io.qua.sma.rea.kaf.dep.SmallRyeReactiveMessagingKafkaProcessor] (build-33) Generating Jackson deserializer for type org.acme.movieplays.MoviePlayed
….
2022-03-21 17:59:10,617 INFO [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18257: Kafka consumer kafka-consumer-movies, connected to Kafka brokers 'localhost:32771, belongs to the 'movie-plays-consumer'
….
2022-03-21 17:59:10,693 INFO [io.quarkus] (Quarkus Main Thread) movie-plays-consumer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 4.114s. Listening on: http://localhost:9090
2022-03-21 17:59:10,693 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-03-21 17:59:10,694 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, resteasy-reactive, resteasy-reactive-jackson, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
アプリケーションがコンパイルされ、Jacksonデシリアライザ (最初に拡張機能を追加したことを思い出してください) を自動的に構成して、Kafkaトピックに格納されているバイト配列からJavaオブジェクトにオブジェクトをデシリアライズします。実行中のアプリケーションは、すでに開始されているKafkaクラスタを検出し、自動的にそれに接続します。最後に、アプリケーションはポート 9090 で開始されます。
新しいターミナルウィンドウで、次のcurlコマンドを実行してストリーミングデータを取得します:
curl -N localhost:9090/movies
data:{"id":4,"duration":213}
data:{"id":4,"duration":3}
data:{"id":3,"duration":96}
data:{"id":5,"duration":200}
data:{"id":2,"duration":234}
data:{"id":1,"duration":36}
data:{"id":1,"duration":162}
data:{"id":3,"duration":88}
データがKafkaトピックから自動的にストリーミングされ、HTTPリクエストとして送信される様子を観察できます。
前のサンプルは、イベントを受信するための Multi
インスタンスとしてチャネルを注入することと、@Incoming
アノテーションが付けられたメソッドにそれらのイベントを送信することの2つの目的を果たします。
コンシューマサービスを停止し、次のコードを PlayedMovieResource
クラスに追加して、Kafkaの Movies トピックのイベントを消費します:
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;
@Inject
Logger logger;
@Incoming("movies")
public void newMovie(Movie movie) {
logger.infov("New movie: {0}", movie);
}
ここでは、新しい動画が動画チャネル (Movies トピック) に送信されるたびに、newMovie()
メソッドが呼び出されます。メソッドパラメータは、Kafkaトピックのイベントのペイロードです。
Movies トピックを指すように、application.properties
ファイルでチャネルを構成します。
mp.messaging.incoming.movies.topic=movies
mp.messaging.incoming.movies.auto.offset.reset=earliest
ここで、movie-plays-consumer サービスを再度開始し、いくつかのログ行が動画のリストを表示していることに注目してください:
./mvnw compile quarkus:dev
[INFO] Scanning for projects...
[INFO]
[INFO] -------------------< org.acme:movie-plays-consumer >--------------------
[INFO] Building movie-plays-consumer 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
…
2022-03-21 17:59:12,146 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-13) SRMSG18256: Initialize record store for topic-partition 'movies-0' at position -1.
2022-03-21 17:59:12,160 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Peter Jackson, genre=Fantasy, id=1, name=The Hobbit]
2022-03-21 17:59:12,164 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Jonathan Frakes, genre=Space, id=2, name=Star Trek: First Contact]
2022-03-21 17:59:12,165 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Jared Bush, genre=Animation, id=3, name=Encanto]
2022-03-21 17:59:12,166 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
2022-03-21 17:59:12,167 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Garth Jennings, genre=Jukebox Musical Comedy, id=5, name=Sing 2]
外部Kafkaブローカ
application.properties
ファイルの kafka.bootstrap.servers
プロパティを構成するだけで、独自のKafkaブローカを利用することもできます。
kafka-bootstrap.servers=kafka:9092
結論
ここまでで、QuarkusアプリケーションでApache Kafkaに接続して、トピックのメッセージ/イベントの生成と消費を開始することが容易なことがわかりました。メッセージの消費は簡単です。それらが生成されている限り、それらを手に入れますが、それ以上のものではありません。データのリアルタイム処理 (たとえば、イベントのフィルタリングやイベントの操作) が必要な場合はどうなるのでしょうか? イベント間に何らかの相関関係を作成する必要がある場合はどうなるのでしょうか (playedmovie イベントには動画の id
がありますが、Movies トピックに参加して動画の名前を取得するにはどうすればよいのでしょうか)。
もちろん、送信されたすべてのデータを操作するためのアドホックなコード開発を始めることもできます。それでも、Kafka Streamsプロジェクトは、イベントの生成時にリアルタイムストリームを消費、変換の適用、ストリームへの接続すること等や、オプションで新しいデータ表現をトピックに書き戻すことに役立ちます。
Kafka Streamsは大きなトピックであり、リアルタイム処理の課題を解決するためのその多様性に輝いています。Kafka StreamsとQuarkusに関する完全な記事に尽くします。期待してください。