I have a simple PySpark program which publishes data into kafka. when i do a spark-submit, it gives error
Command being run :
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.2.0 ~/PycharmProjects/Kafka/PySpark_Kafka_SSL.py
Error :
Traceback (most recent call last): File "/Users/karanalang/PycharmProjects/Kafka/PySpark_Kafka_SSL.py", line 33, in <module> df.write.format('kafka') File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 738, in save File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/py4j-0.10.9.2-src.zip/py4j/java_gateway.py", line 1309, in __call__ File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/py4j-0.10.9.2-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o41.save. : java.lang.NoClassDefFoundError: scala/$less$colon$less at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:180) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ... 42 more
Spark Version – 3.2.0; I’ve confluent kafka installed on my m/c, here is the version :
Karans-MacBook-Pro:confluent-6.2.1 karanalang$ confluent local services kafka version The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html 6.2.1-ce
Here is the code :
import sys, datetime, time, os from pyspark.sql.functions import col, rank, dense_rank, to_date, to_timestamp, format_number, row_number, lead, lag,monotonically_increasing_id from pyspark.sql import SparkSession, Window spark = SparkSession.builder.appName('StructuredStreaming_KafkaProducer').getOrCreate() kafkaBrokers='host:port' # CA Root certificate ca.crt caRootLocation='/Users/karanalang/Documents/Technology/strimzi/gcp-certs-dec3/caroot.pem' # user public (user.crt) certLocation='/Users/karanalang/Documents/Technology/strimzi/gcp-certs-dec3/my-bridge/my-bridge-user-crt.pem' # user.key keyLocation='/Users/karanalang/Documents/Technology/strimzi/gcp-certs-dec3/my-bridge/user-with-certs.pem' password='passwd' topic = "my-topic" df = spark.read.csv("data/input.txt", header=False) df.write.format('kafka') .option("kafka.bootstrap.servers",kafkaBrokers) .option("security.protocol","SSL") .option("ssl.ca.location",caRootLocation) .option("ssl.certificate.location", certLocation) .option("ssl.key.location",keyLocation) .option("ssl.key.password",password) .option("subscribe", topic) .save()
Any ideas what the issue is ? The Spark version seems to be matching the jar tia!
Advertisement
Answer
The error:
Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
Usually pops up when something is not right with the Scala version.
If you run spark-shell
, you’ll get the output:
Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/_,_/_/ /_/_ version 3.2.0 /_/ Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_292)
It says: Using Scala version 2.12.15
It also mentions: “For the Scala API, Spark 3.2.0 uses Scala 2.12. You will need to use a compatible Scala version (2.12.x)”, in the docs.
But when we look at the spark-sql-kafka-0-10_2.13:3.2.0
in the Maven repository: Kafka 0.10+ Source For Structured Streaming » 3.2.0 it says: Scala target: Scala 2.13
I would try to specify Scala version in spark-sql-kafka
, you can find desired Scala version by going to “View all targets”.
Try with: Kafka 0.10+ Source For Structured Streaming » 3.2.0:
Note the change: spark-sql-kafka-0-10_2.13:3.2.0 -> spark-sql-kafka-0-10_2.12:3.2.0
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 ~/PycharmProjects/Kafka/PySpark_Kafka_SSL.py