Here I am trying to execute Structured Based Streaming with Apache Kafka. But in here not working and execute error (ERROR MicroBatchExecution: Query [id = daae4c34-9c8a-4c28-9e2e-88e5fcf3d614, runId = ca57d90c-d584-41d3-a8de-6f9534ead0a0] terminated with error java.lang.UnsatisfiedLinkError:org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z). How can i solve this issue. I work on windows 10 machine.
App Class:
package com.rakib;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class App {
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
System.setProperty("hadoop.home.dir", "c:/hadoop");
Logger.getLogger("org.apache").setLevel(Level.WARNING);
SparkSession sparkSession = SparkSession.builder()
.appName("SparkSQL")
.master("local[*]")
.getOrCreate();
Dataset<Row> rowDataset = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9091,localhost:9092,localhost:9093")
.option("subscribe", "student")
.option("startingOffsets", "earliest")
.load();
rowDataset.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
//rowDataset.createOrReplaceTempView("student_info");
//Dataset<Row> dataset = sparkSession.sql("SELECT value FROM student_info");
StreamingQuery query = rowDataset
.writeStream()
.format("console")
.outputMode(OutputMode.Append())
.start();
query.awaitTermination();
}
}
Pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Test_One</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
</project>
Error:
20/08/20 23:37:21 INFO MicroBatchExecution: Reading table [org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@71202043] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@1a4d79db]
20/08/20 23:37:21 ERROR MicroBatchExecution: Query [id = daae4c34-9c8a-4c28-9e2e-88e5fcf3d614, runId = ca57d90c-d584-41d3-a8de-6f9534ead0a0] terminated with error
java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1221)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1426)
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:495)
at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:548)
at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1905)
at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1901)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1907)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1866)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1825)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.list(CheckpointFileManager.scala:299)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:186)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:272)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:194)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
Exception in thread "stream execution thread for [id = daae4c34-9c8a-4c28-9e2e-88e5fcf3d614, runId = ca57d90c-d584-41d3-a8de-6f9534ead0a0]" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1221)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1426)
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:495)
at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:548)
at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1905)
at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1901)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1907)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1866)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1825)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.list(CheckpointFileManager.scala:299)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:186)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:272)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:194)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
=== Streaming Query ===
Identifier: [id = daae4c34-9c8a-4c28-9e2e-88e5fcf3d614, runId = ca57d90c-d584-41d3-a8de-6f9534ead0a0]
Current Committed Offsets: {}
Current Available Offsets: {}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=true]
+- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@8774409, KafkaV2[Subscribe[student]]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1221)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1426)
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:495)
at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:548)
at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1905)
at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1901)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1907)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1866)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1825)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.list(CheckpointFileManager.scala:299)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:186)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:272)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:194)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
... 1 more
20/08/20 23:37:21 INFO SparkContext: Invoking stop() from shutdown hook
20/08/20 23:37:21 INFO SparkUI: Stopped Spark web UI at http://DESKTOP-3147U79:4040
20/08/20 23:37:21 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/08/20 23:37:21 INFO MemoryStore: MemoryStore cleared
20/08/20 23:37:21 INFO BlockManager: BlockManager stopped
20/08/20 23:37:21 INFO BlockManagerMaster: BlockManagerMaster stopped
20/08/20 23:37:21 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/08/20 23:37:21 INFO SparkContext: Successfully stopped SparkContext
20/08/20 23:37:21 INFO ShutdownHookManager: Shutdown hook called
20/08/20 23:37:21 INFO ShutdownHookManager: Deleting directory C:\Users\itc\AppData\Local\Temp\temporary-850444d9-5110-4c13-881f-a6e0ba7153d8
20/08/20 23:37:21 INFO ShutdownHookManager: Deleting directory C:\Users\itc\AppData\Local\Temp\spark-813cc4f1-9d4b-44f2-99ae-435d9e99f566
Process finished with exit code 1
This error generally occurs due to the mismatch in your binary files in your %HADOOP_HOME%\bin
folder. So, what you need to do is to get hadoop.dll
and winutils.exe
specifically for your hadoop version.
Get hadoop.dll
and winutils.exe
for your specific hadoop version and copy them to your %HADOOP_HOME%\bin
folder.