一、练习1
读取文件,统计文件内单次出现的次数。
import os
from pyspark import SparkConf, SparkContext
#####定义PySpark运行环境
os.environ["PYSPARK_PYTHON"]="D:softpythonpython3104python.exe"
#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
#利用SparkConf构建SparkContext对象
sContex=SparkContext(conf=conf)
#打印pyspark的运行版本
print(sContex.version) #3.4.0
#读取文件中的内容
rdd_text=sContex.textFile("D:hello.txt")
#将所有的单次都取出来并拼接成二元元组
rdd_text_map=rdd_text.flatMap(lambda x: x.split(" ")).map(lambda x:(x,1))
#利用reduceByKey进行分组聚合
rdd_text_reduce=rdd_text_map.reduceByKey(lambda x,y:(x+y))
#[('java', 7), ('kettle', 2), ('python', 6), ('kettlekettle', 1), ('spark', 4), ('pyspark', 3)]
print(rdd_text_reduce.collect())
#停止SparkContext对象的运行
sContex.stop()
输出结果:
二、练习二
相关代码:
"""
完成练习案例:JSON商品统计
需求:
1. 各个城市销售额排名,从大到小
2. 全部城市,有哪些商品类别在售卖
3. 北京市有哪些商品类别在售卖
"""
import json
import os
from pyspark import SparkConf, SparkContext
#####定义PySpark运行环境
os.environ["PYSPARK_PYTHON"]="D:softpythonpython3104python.exe"
#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
#利用SparkConf构建SparkContext对象
sContex=SparkContext(conf=conf)
#打印pyspark的运行版本
print(sContex.version) #3.4.0
#读取文件中的内容
rdd_orders=sContex.textFile("D:orders.txt")
# TODO 需求1:城市销售额排名
# 1.1 读取文件得到RDD
# 1.2 取出一个个JSON字符串
rdd_orders_json=rdd_orders.flatMap(lambda x: x.split("|"))
#:['{"id":1,"timestamp":"2019-05-08T01:03.00Z","category":"平板电脑","areaName":"北京","money":"1450"}'.....
print(f"rdd_orders_json:{rdd_orders_json.collect()}")
# 1.3 将一个个JSON字符串转换为字典
rdd_orders_dict=rdd_orders_json.map(lambda x:json.loads(x))
#[{'id': 1, 'timestamp': '2019-05-08T01:03.00Z', 'category': '平板电脑', 'areaName': '北京', 'money': '1450'},
print(f"rdd_orders_dict:{rdd_orders_dict.collect()}")
# 1.4 取出城市和销售额数据
# (城市,销售额)
rdd_orders_map=rdd_orders_dict.map(lambda x:(x["areaName"],int(x["money"])))
#[('北京', '1450'), ('北京', '1450'), ('北京', '8412'), ('上海', '1513').....
print(f"rdd_orders_map:{rdd_orders_map.collect()}")
# 1.5 按城市分组按销售额聚合
res_01_rdd=rdd_orders_map.reduceByKey(lambda x,y:(x+y)).sortBy(lambda x:x[1],ascending=False,numPartitions=1)
# 1.6 按销售额聚合结果进行排序
#城市销售额排名:[('北京', 91556), ('杭州', 28831), ('天津', 12260), ('上海', 1513), ('郑州', 1120)]
print(f"城市销售额排名:{res_01_rdd.collect()}")
# TODO 需求2:全部城市有哪些商品类别在售卖
# 2.1 取出全部的商品类别
# 2.2 对全部商品类别进行去重
res_02_rdd=rdd_orders_dict.map(lambda x:x["category"]).distinct()
#全部城市有哪些商品类别在售卖:['平板电脑', '家电', '书籍', '手机', '电脑', '家具', '食品', '服饰']
print(f"全部城市有哪些商品类别在售卖:{res_02_rdd.collect()}")
# TODO 需求3:北京市有哪些商品类别在售卖
# 3.1 过滤北京市的数据
beijing_orders_rdd=rdd_orders_dict.filter(lambda x:x["areaName"]=='北京')
# 3.2 取出全部商品类别
# 3.3 进行商品类别去重
res_03_rdd=beijing_orders_rdd.map(lambda x:x["category"]).distinct()
#北京市有哪些商品类别在售卖:['平板电脑', '家电', '书籍', '手机', '电脑', '家具', '食品', '服饰']
print(f"北京市有哪些商品类别在售卖:{res_03_rdd.collect()}")
输出结果:
推荐文章:
2、SpringBoot线程池ThreadPoolTaskExecutor异步处理百万级数据;
3、SpringBoot用线程池ThreadPoolExecutor处理百万级数据。
阅读全文
下载说明:
1、本站所有资源均从互联网上收集整理而来,仅供学习交流之用,因此不包含技术服务请大家谅解!
2、本站不提供任何实质性的付费和支付资源,所有需要积分下载的资源均为网站运营赞助费用或者线下劳务费用!
3、本站所有资源仅用于学习及研究使用,您必须在下载后的24小时内删除所下载资源,切勿用于商业用途,否则由此引发的法律纠纷及连带责任本站和发布者概不承担!
4、本站站内提供的所有可下载资源,本站保证未做任何负面改动(不包含修复bug和完善功能等正面优化或二次开发),但本站不保证资源的准确性、安全性和完整性,用户下载后自行斟酌,我们以交流学习为目的,并不是所有的源码都100%无错或无bug!如有链接无法下载、失效或广告,请联系客服处理!
5、本站资源除标明原创外均来自网络整理,版权归原作者或本站特约原创作者所有,如侵犯到您的合法权益,请立即告知本站,本站将及时予与删除并致以最深的歉意!
6、如果您也有好的资源或教程,您可以投稿发布,成功分享后有站币奖励和额外收入!
7、如果您喜欢该资源,请支持官方正版资源,以得到更好的正版服务!
8、请您认真阅读上述内容,注册本站用户或下载本站资源即您同意上述内容!
原文链接:https://www.shuli.cc/?p=14780,转载请注明出处。
1、本站所有资源均从互联网上收集整理而来,仅供学习交流之用,因此不包含技术服务请大家谅解!
2、本站不提供任何实质性的付费和支付资源,所有需要积分下载的资源均为网站运营赞助费用或者线下劳务费用!
3、本站所有资源仅用于学习及研究使用,您必须在下载后的24小时内删除所下载资源,切勿用于商业用途,否则由此引发的法律纠纷及连带责任本站和发布者概不承担!
4、本站站内提供的所有可下载资源,本站保证未做任何负面改动(不包含修复bug和完善功能等正面优化或二次开发),但本站不保证资源的准确性、安全性和完整性,用户下载后自行斟酌,我们以交流学习为目的,并不是所有的源码都100%无错或无bug!如有链接无法下载、失效或广告,请联系客服处理!
5、本站资源除标明原创外均来自网络整理,版权归原作者或本站特约原创作者所有,如侵犯到您的合法权益,请立即告知本站,本站将及时予与删除并致以最深的歉意!
6、如果您也有好的资源或教程,您可以投稿发布,成功分享后有站币奖励和额外收入!
7、如果您喜欢该资源,请支持官方正版资源,以得到更好的正版服务!
8、请您认真阅读上述内容,注册本站用户或下载本站资源即您同意上述内容!
原文链接:https://www.shuli.cc/?p=14780,转载请注明出处。
评论0