Pages

Monday, November 24, 2014

Apache Spark Twitter Streaming complete working code in Scala

Following code will stream live Twitter public feeds on spark-shell. You will need to create an app on Twitter and insert the CosumerKey, ConsumerSecret, AccessToken and AccessSecret here in the code. You can change the filter terms in the filter array (val filter = Array(“Twitter”, “Hadoop”, “Big Data”)), in the code below.
[scala]
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import twitter4j.auth.Authorization
import twitter4j.Status
import twitter4j.auth.AuthorizationFactory
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark.streaming.api.java.JavaStreamingContext

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.function.Function
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream

val consumerKey = "<insert your consumer key here>"
val consumerSecret = "<insert your consumer secret here>"
val accessToken = "<insert access token>"
val accessTokenSecret = "<insert token secret>"
val url = "https://stream.twitter.com/1.1/statuses/filter.json"

val sparkConf = new SparkConf().setAppName("Twitter Streaming")
val sc = new SparkContext(sparkConf)

val documents: RDD[Seq[String]] = sc.textFile("").map(_.split(" ").toSeq)

<span style="color:#888888;">// Twitter Streaming
val ssc = new JavaStreamingContext(sc,Seconds(2))

val conf = new ConfigurationBuilder()
conf.setOAuthAccessToken(accessToken)
conf.setOAuthAccessTokenSecret(accessTokenSecret)
conf.setOAuthConsumerKey(consumerKey)
conf.setOAuthConsumerSecret(consumerSecret)
conf.setStreamBaseURL(url)
conf.setSiteStreamBaseURL(url)

val filter = Array("Twitter", "Hadoop", "Big Data")

val auth = AuthorizationFactory.getInstance(conf.build())
val tweets : JavaReceiverInputDStream[twitter4j.Status] = TwitterUtils.createStream(ssc, auth, filter)

val statuses = tweets.dstream.map(status => status.getText)
statuses.print()
ssc.start()
[/scala]