使用Spark进行流量日志分析


部分分析日志指标如下:

PV: 网站页面访问数。 UV: 页面IP的访问量统计,即独立IP。 PVPU:平均每位用户访问页面数。 Time: 用户每小时PV的统计。 Source: 用户来源域名的统计。 Browser: 用户的访问设备统计。

截取有意义的字段,本次排序的部分内容如下

其中第一列为时间戳, 其中第二列为手机号, 其中第三列为上行流量, 其中第四列为下行流量。

【图片】

需求描述

以手机号为基准,先按上行流量排序,如果上行流量相同,再按下行流量排序,以此类推再按时间戳排序,并将排序的结果生成文件

核心编程思想描述:

对于元祖的N次排序,使用sortBy()方法

具体实现步骤:

第一步:获取文件内容 第二步:将文件通过split(" ")空个切割成数组 第三步:将数组拼接成每行元祖 第四步:对元祖中指定列进行排序 第五步:将元祖恢复成string 第六步:输出成为文件

代码:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object LogSort {
    def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("Log Sort")
        val sc = new SparkContext(conf)

        // 获取文件
        val lines = sc.textFile("/home/katherine/project/test/sparkapp/input/access_20170504.log")

        // 先按上行流量,下行流量。再按时间戳排序
        val results = lines.map(_.split(" "))
                        .map(i => (i(0).toLong, i(1), i(2).toInt, i(3).toInt))
                        .sortBy(j => (j._3, j._4, j._1))
                        .map(i => i._1 + " " + i._2 + " " + i._3 + " " + i._4)

        // 排序后数据存文件
        results.saveAsTextFile("/home/katherine/project/test/sparkapp/output/LogSort")
    }
}

results matching ""

    No results matching ""