Apache Spark 2.0で導入された新しい機能であるStructured Streamingは、業界およびデータエンジニアリング業界で多くの注目を集めています。Spark SQLエンジンの上に構築されたStructured Streaming APIは、データをストリーミングするためのSQLライクなインターフェースを提供します。
当初、Apache Sparkは構造化ストリーミングクエリをマイクロバッチで処理し、待ち時間は約100ミリ秒でした。
昨年、バージョン2.3では、低レイテンシ(1ミリ秒)の「連続処理」が導入されました。これは、Structured Streamingの採用をさらに後押ししています。
Sparkの連続処理の速度で動作させるには、Redisのような高速ストリーミングデータベースで補強する必要があります。
このオープンソースのインメモリデータベースは、高速でミリ秒以下の待ち時間で知られています。Redis 5.0は最近Redis Streamsと呼ばれる新しいデータ構造を導入しました。これによって、Redisでストリーミングデータを複数のプロデューサーとコンシューマーの間で消費、保持、分配できるようになります。
今度の質問は、Redis StreamsをストリーミングデータベースとしてApache Sparkデータ処理エンジンと一緒にデプロイするための最良の方法は何かということです。
Scalaで書かれたSpark-Redisライブラリは、Apache SparkとRedisを統合しているので、次のことが可能です。
- RedisでRDDとしてデータを読み書きする
- データをRedisのDataFrameとして読み書きする(つまり、Spark SQLテーブルをRedisデータ構造にマッピングすることができます)
- Redis StreamsをStructured Streamingのソースとして使用する
- Structured Streamingの後にRedisをシンクとして実装する
この記事では、実際のシナリオを紹介し、RedisとApache Sparkを使用してストリーミングデータをリアルタイムで処理する方法について説明します。
仮想シナリオ:リアルタイムクリックの計算
私たちが、人気のあるウェブサイトにディスプレイ広告を掲載する広告会社であるとしましょう。ソーシャルメディア上の人気のある画像に基づいて動的なミームを作成し、それらを広告として配置します。収益を最大化するには、口コミで素早く広がったり、クリック数が増えたりする資産を特定して、それらをより頻繁に表示できるようにする必要があります。
ほとんどの資産の寿命は短く、クリックをリアルタイムで処理することで、傾向のある画像をすばやく活用することができます。これはビジネスにとって非常に重要です。私たちの理想的なストリーミングデータソリューションは、すべての広告クリックを記録し、それらをリアルタイムで処理し、さらに各アセットのリアルタイムクリック数を計算する必要があります。設計方法は次のとおりです。
図1. クリック数をリアルタイムで計算するためのビルディングブロック
入力
クリックするたびに、データインジェストソリューション(図1のブロック1)では、アセットIDと広告の費用がRedis Streamに次のように配置されます。
XADD clicks * asset [asset id] cost [actual cost]
下に例を示します。
XADD clicks * asset aksh1hf98qw7tt9q7 cost 29
出力
図1のブロック2によるデータ処理の後、結果はデータストアに格納されます。データクエリソリューション(図1のブロック3)は、データへのSQLインタフェースを提供しているので、直近数分間のトップクリックをクエリできます。
select asset, count from clicks order by count desc
asset count
----------------- -----
aksh1hf98qw7tt9q7 2392
i2dfb8fg023714ins 2010
jsg82t8jasvdh2389 1938
ソリューションを設計する
これでビジネス要件が定義できたので、Redis 5.0とApache Spark 2.4を使用してこのソリューションを構築する方法を探りましょう。この記事では、Scalaプログラミング言語で開発していますが、Spark-RedisライブラリをJavaまたはPythonでも使用できます。
[画像をクリックすると拡大します]
図2. ソリューションアーキテクチャ
このフロー図は非常にわかりやすいです。最初にシステムがデータをRedis Streamに取り込み、次にデータをSparkプロセスとして消費して結果をRedisに集約し、最後にSpark-SQLを使用してRedisの結果を照会します。
- データ取り込み:データ取り込みにはRedis Streamsを選択しました。Redisに組み込みのデータ構造で、1秒間に100万を超える読み書き操作を処理できるためです。さらに、時間に従ってデータを自動的に順序付けし、データの読み取り方法を効率化するコンシューマグループをサポートします。Spark-RedisライブラリはRedis Streamsをデータソースとしてサポートしているので、ストリーミングデータベースがApache Spark Engineと連携するという私たちの必要性に完全に合っています。
- データ処理:Apache SparkのStructured Streaming APIはデータ処理に最適です。Spark-Redisライブラリを使用すると、Redis Streamsに届いたデータをDataFrameに変換できます。Structured Streamingを使用すると、クエリをマイクロバッチまたはSparkの連続処理モードで実行できます。カスタムの「writer」を開発することもでき、それによって好みの宛先にデータを書くことができるようになります。図2に示すように、私たちはハッシュデータ構造を使用して出力をRedisに書き込みます。
- データクエリ:Spark-Redisライブラリでは、ネイティブのRedisデータ構造をDataFrameとしてマッピングできます。「一時テーブル」を宣言することができます。これはカラムをHashデータ構造の特定のキーにマッピングするもので、Redisは1ミリ秒以下の待ち時間で非常に高速であるため、Spark-SQLでリアルタイムのクエリ機能を使用することができます。
さて、私たちのソリューションの各コンポーネントをどのように開発して実行するかを説明します。まず、適切なツールを使用して開発環境を初期化しましょう。
適切な開発ツールを見つける
この例では、ソフトウェアをmacOSにダウンロードしてインストールするためにHomebrewパッケージマネージャを使用しますが、オペレーティングシステムによっては他のパッケージマネージャを選択してください。
1. Redis 5.0 or higher:まず、Redis 5.xを環境にダウンロードしてインストールする必要があります。以前のバージョンのRedisはRedis Streamsをサポートしていません。
Homebrewでは、Redis 5.0を次のようにインストールして起動します。
$ brew install Redis
$ brew services start Redis
以前のバージョンのRedisを実行している場合は、Redisをアップグレードできます。
$ brew upgrade Redis
2. Apacke Spark 2.3 or higher:次に、WebサイトからApache Sparkをダウンロードしてインストールするか、Homebrewを使ってインストールします。
$ brew install apache-spark
3. Scala 2.12.8 or higher:Scalaも同じようにインストールします。
$ brew install scala
4. Apache Maven:Spark-Redisライブラリを構築するにはMavenが必要です。
$ brew install maven
5. JDK 1.8 or higher: 次のコマンドを実行して、このJDKをOracleのWebサイトまたはHomebrewからダウンロードしてインストールします。JDKの最新バージョンについては、java8をjavaに置き換える必要があります。
$ brew cask install java8
6. Spark-Redisライブラリ:これが私たちのソリューションのコア部分なので、GitHubからライブラリをダウンロードして、以下のようにパッケージをビルドしましょう。
$ git clone https://github.com/RedisLabs/spark-redis.git
$ cd spark-redis
$ mvn clean package -DskipTests
これにより、./target/ディレクトリの下にspark-redis-<version>-jar-with-dependencies.jar が現れます。私の設定では、ファイルはspark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jarです。
7. SBT 1.2.8 or higher:SBTはScalaファイルのオーガナイズとビルドを簡単にできるようにするScalaビルドツールです。
$ brew install sbt
8. 開発環境:最後に、フォルダ構造とビルドファイルを設定しましょう。この例では、プログラムを「scala」ディレクトリの下にコーディングします。
$ mkdir scala
$ cd ./scala
次の内容で新しいファイルbuild.sbtを作成します。
name := "RedisExample"
version := "1.0"
scalaVersion := "2.12.8"
val sparkVersion = "2.4.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-catalyst" % sparkVersion
)
ディレクトリを初期化します。次のコマンドでパッケージディレクトリを初期化します。
$ mkdir ./src/main/scala/
$ mkdir ./lib
$ sbt package
spark-redis-<version>-jar-with-dependencies.jarをlibディレクトリにコピーします。
クリックカウントソリューションの構築
アーキテクチャのセクションで説明したように、このソリューションには3つの部分があります。データ取り込みコンポーネント、Sparkエンジン内のデータプロセッサ、データクエリインターフェイスです。このセクションでは、これらのそれぞれについて深く掘り下げ、実用的な解決策を紹介します。
1. Redisストリームへの取り込み
Redis Streamsは追加専用のデータ構造です。データがApache Sparkの連続処理ユニットによって消費されると仮定すると、メッセージ数を100万に制限することができます。実行するために前に表示されたコマンドをわずかに修正しましょう。
XADD clicks MAXLEN ~ 1000000 * asset aksh1hf98qw7tt9q7 cost 29
最も人気のあるRedisクライアントはRedis Streamsをサポートしているため、プログラミング言語に応じて、Pythonではredis-py、JavaではJedisまたはLettuce、Node.jsではnode-redisなどを選択できます。
[画像をクリックすると拡大します]
図3. データ取り込み
2. データ処理
このセクションを3つのサブセクションに分けて、ソリューションのこのパートを全てカバーします。
- A. Redis Streamからのデータの読み取りと処理
- B. 結果をRedisに保存する
- C. プログラムを実行する
[画像をクリックすると拡大します]
図4. データ処理
A. Redis Streamsからデータを読み取る
SparkでRedis Streamsからデータを読み取るには、Redis Streams内のデータのスキーマ構造と共に、Redisへの接続方法を確立する必要があります。
Redisに接続するには、Redis用の接続パラメータを使用して新しいSparkSessionを作成する必要があります。
val spark = SparkSession
.builder()
.appName("redis-example")
.master("local[*]")
.config("spark.redis.host", "localhost")
.config("spark.redis.port", "6379")
.getOrCreate()
スキーマ構造を設定するには、ストリームに「clicks」という名前を付け、「stream.keys」のオプションを「clicks」として設定します。各ストリーム要素にはアセットとそれに関連付けられたコストが含まれているので、2つのStructFieldを持つ配列であるStructTypeを作成します。下に示すように、1つは「asset」用、もう1つは「コスト」用です。
val clicks = spark
.readStream
.format("redis")
.option("stream.keys","clicks")
.schema(StructType(Array(
StructField("asset", StringType),
StructField("cost", LongType)
)))
.load()
最初のプログラムでは、1資産あたりのクリック数に関心があります。そのため、asset数でグループ化されたデータを含むDataFrameを作成しましょう。
val byasset = clicks.groupBy("asset").count
最後のステップは、Structured Streamクエリとしてクエリを開始することです。
val query = byasset
.writeStream
.outputMode("update")
.foreach(clickWriter)
.start()
結果をRedisに書き戻すために独自のForeachWriterを使用していることに注意してください。出力をコンソールに送りたい場合は、クエリを次のように書くことができます。
val query = byasset
.writeStream
.outputMode("update")
.format("console")
.start()
継続処理については、.trigger(Trigger.Continuous("1 second"))としてクエリに「trigger」コマンドを加えます。triggerコマンドは集計クエリでは機能しないため、この例では挿入できません。
以下は、Redis Streamsから新しいクリックデータを読み込み、SparkのStructured Streaming APIを使用して処理する完全なプログラムです。あなたの環境でこれを試したい場合は、src/main/scalaの下にClickAnalysis.scalaとしてプログラムを保存してください。(Redisサーバがポート6379でローカルに実行されていない場合は、必ず適切な接続パラメータを設定してください。)
// Program: ClickAnalysis.scala
//
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.redislabs.provider.redis._
object ClickAnalysis {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("redis-example")
.master("local[*]")
.config("spark.redis.host", "localhost")
.config("spark.redis.port", "6379")
.getOrCreate()
val clicks = spark
.readStream
.format("redis")
.option("stream.keys","clicks")
.schema(StructType(Array(
StructField("asset", StringType),
StructField("cost", LongType)
)))
.load()
val byasset = clicks.groupBy("asset").count
val clickWriter : ClickForeachWriter =
new ClickForeachWriter("localhost","6379")
val query = byasset
.writeStream
.outputMode("update")
.foreach(clickWriter)
.start()
query.awaitTermination()
} // End main
} //End object
B. 結果をRedisに保存する
結果をRedisに書き戻すために、ClickForeachWriterというカスタムのForeachWriterを開発できます。これはForeachWriterを拡張し、Redis用のJavaクライアントであるJedisを使ってRedisに接続します。これはClickForeachWriter.scalaとして保存された完全なプログラムです。
// Program: ClickForeachWriter.scala
//
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.Row
import redis.clients.jedis.Jedis
class ClickForeachWriter(p_host: String, p_port: String) extends
ForeachWriter[Row]{
val host: String = p_host
val port: String = p_port
var jedis: Jedis = _
def connect() = {
jedis = new Jedis(host, port.toInt)
}
override def open(partitionId: Long, version: Long):
Boolean = {
return true
}
override def process(record: Row) = {
var asset = record.getString(0);
var count = record.getLong(1);
if(jedis == null){
connect()
}
jedis.hset("click:"+asset, "asset", asset)
jedis.hset("click:"+asset, "count", count.toString)
jedis.expire("click:"+asset, 300)
}
override def close(errorOrNull: Throwable) = {
}
}
このプログラムで注意すべき重要な点が1つあります。結果はハッシュデータ構造に格納され、そのキーは構文「click:<asset id>」に従います。 この記事の最後のセクションでは、この構造をDataFrameとして使用するように変換します。もう1つのポイントは、キーの有効期限です。これはオプションです。ここでは、クリックが記録されるたびに、キーの寿命を5分延長する方法を説明します。
C. プログラムを実行する
プログラムを実行する前に、まずプログラムをコンパイルする必要があります。これを行うには、ホームディレクトリ(build.sbtを保存したディレクトリ)に移動して、次のコマンドを実行します。
$ sbt package
プログラムはエラーなしでコンパイルされるはずです。エラーが見つかった場合は、修正してsbtパッケージを再実行してください。プログラムを起動するには、同じディレクトリで次のコマンドを実行します。
spark-submit --class ClickAnalysis --jars
./lib/spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar
--master local[*] ./target/scala-2.12/redisexample_2.12-1.0.jar
デバッグメッセージが気に入らない場合は、プログラムを停止し(ctrl-c)、log4j.properties under /usr/local/Cellar/apache-spark/2.4.0/libexec/conf/を編集します(環境内でlog4j.propertiesファイルが格納されている場所であればどこでも)。そしてlog4j.rootCategoryを次のようにWARNに変更します。
log4j.rootCategory=WARN, console
このプログラムはRedis Streamから自動的にメッセージを引き出します。Redis Streamにメッセージがない場合は、新しいメッセージを非同期的にリッスンします。 別のコンソールでredis-cliを起動し、Redis Streamにメッセージを追加して、アプリがメッセージを正しく消費しているかどうかをテストできます。
$ redis-cli
redis-cli> XADD clicks * asset test cost 100
すべてがうまくいったら、ハッシュデータ構造内の結果を次のように読むことができます。
redis-cli> hgetall click:test
1) "asset"
2) "test"
3) "count"
4) "1"
3. データをクエリする:RedisデータをDataFrameとして読み込む
このソリューションの最後のコンポーネントは、基本的にRedisデータへのSQLインターフェースを提供します。SQLコマンドを介してデータを読み取ることは、ここでも2段階のプロセスとなります。a. Redisデータ用のSQLスキーマを定義し、b. SQLコマンドを実行します。
[画像をクリックすると拡大します]
図5. データクエリ
しかし、これを実行する前に、次に示すように、ホームディレクトリからコンソールでspark-sqlを実行する必要があります。
$ spark-sql --jars
./lib/spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar
これにより、次のようにspark-sqlプロンプトが表示されます。
spark-sql>
では、Redisハッシュデータ構造に格納されているデータのSQLスキーマを定義します。覚えているのであれば、click:<asset id>というキーで示されるハッシュデータ構造内に各アセットのデータを保存しています。ハッシュにはもう1つのキーであるcountがあります。スキーマを作成してそれをRedisハッシュデータ構造にマッピングするコマンドは次のとおりです。
spark-sql> CREATE TABLE IF NOT EXISTS clicks(asset STRING, count
INT) USING org.apache.spark.sql.redis OPTIONS (table 'click')
このコマンドは、「clicks」という名前の新しいテーブルビューを作成します。Spark-Redisライブラリで指定されているディレクティブを使用して、列「asset」と「count」をハッシュのそれぞれのフィールドにマップします。これで、クエリを次のように実行できます。
spark-sql> select * from clicks;
test 1
Time taken: 0.088 seconds, Fetched 1 row(s)
SQLクエリをプログラムで実行する場合は、ODBC/JDBCドライバを使用してSparkエンジンに接続する方法についてのApache Sparkが提供するドキュメントを参照してください。
私たちは何を達成したか
この記事では、Redis StreamsをApache Sparkエンジンのデータソースとして使用する方法と、Redis StreamsがStructured Streamingのユースケースをどのように活用するかについて説明しました。また、Apache SparkでDataFrames APIを使用してRedisデータを読み取る方法を示し、独立したスタンドアローンStructured StreamingおよびDataFramesの概念をまとめて、Spark-Redisライブラリを使用して実現できることを示しました。
Redis Streamsを使用すると、データを高速に収集および配布する作業が簡単になります。Apache SparkのStructured Streamingと組み合わせることで、IoT、不正検出、AIと機械学習、リアルタイム分析など、さまざまなシナリオ向けのリアルタイム処理を必要とするあらゆる種類のソリューションを強化できます。
著者について
Roshan Kumar氏は、Redis Labs、Inc.のシニアプロダクトマネージャです。彼は、ソフトウェア開発およびテクノロジ分野の製品管理の分野で豊富な経験を持っています。これまで、Kumar氏はHewlett-Packardと、シリコンバレーのいくつかの成功しているスタートアップ企業で働いてきました。彼は、コンピュータサイエンスの学士号と、カリフォルニア州サンタクララ大学でMBAを取得しています。