Spark wordcount Java版

作者: Ju4t

完整版

package com.ju4t.bigdata.spark.core.wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark_WordCount {
  def main(args: Array[String]): Unit = {
    // Application
    // Spark框架
    // TODO 建立和Spark框架的连接
    // JDBC : Connection
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparkConf);

    // TODO 执行业务操作
    // 1.读取文件,获取一行一行的数据
    // hello word
    val lines: RDD[String] = sc.textFile("data/WordCount")

    // 2.将一行数据进行拆分,形成一个一个的效果
    // 扁平化:将整体拆分成个体对操作
    // "hello word" => hello, word
    val words: RDD[String] = lines.flatMap(_.split(" "))

    // 优化
    val wordToOne = words.map(
      word => (word, 1)
    )

    // 3.将数据根据单词分组,便于统计
    // (hello, hello, hello), (word, word, word)
    val wordGroup: RDD[(String, Iterable[(String, Int)])] = wordToOne.groupBy(
      t => t._1
    )

    // 4.对分组后对数据进行转换
    // (hello, hello, hello), (word, word, word)
    // (hello, 3), (word, 2)
    val wordToCount = wordGroup.map {
      case (word, list) => {
        list.reduce(
          (t1, t2) => {
            (t1._1, t1._2 + t2._2)
          }
        )
      }
    }

    // 5.将转换结果采集到控制台打印出来
    val array: Array[(String, Int)] = wordToCount.collect()
    array.foreach(println)

    // TODO 关闭连接
    sc.stop()
  }
}

精简

package com.ju4t.bigdata.spark.core.wc

import org.apache.spark.{SparkConf, SparkContext}

object Spark_WordCount {
  def main(args: Array[String]): Unit = {
    // Application
    // Spark框架
    // TODO 建立和Spark框架的连接
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparkConf);

//    // TODO 执行业务操作
//    val lines = sc.textFile("data/WordCount")
//    val words = lines.flatMap(_.split(" "))
//
//    // 优化
//    val wordToOne = words.map(
//      word => (word, 1)
//    )
//
//    // spark 提供给了更多对功能,将分组和聚合使用一个方法来实现
//    // reduceByKey:相同的key的数据,可以对value进行reduce聚合
//    // wordToOne.reduceByKey((x,y)=>{ x + y })
//    val wordToCount = wordToOne.reduceByKey(_ + _) // 上面对简写
//    sc.textFile("data/WordCount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect()
//    val array = wordToCount.collect()

    // 这一句等于上面
    val array = sc.textFile("data/WordCount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect();
    array.foreach(println)

    // TODO 关闭连接
    sc.stop()
  }
}