Python-PySpark数据计算练习案例

一、练习1

读取文件,统计文件内单次出现的次数。

import osfrom pyspark import SparkConf, SparkContext#####定义PySpark运行环境os.environ["PYSPARK_PYTHON"]="D:softpythonpython3104python.exe"#创建SparkConf类对象conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")#利用SparkConf构建SparkContext对象sContex=SparkContext(conf=conf)#打印pyspark的运行版本print(sContex.version)  #3.4.0#读取文件中的内容rdd_text=sContex.textFile("D:hello.txt")#将所有的单次都取出来并拼接成二元元组rdd_text_map=rdd_text.flatMap(lambda x: x.split(" ")).map(lambda x:(x,1))#利用reduceByKey进行分组聚合rdd_text_reduce=rdd_text_map.reduceByKey(lambda x,y:(x+y))#[('java', 7), ('kettle', 2), ('python', 6), ('kettlekettle', 1), ('spark', 4), ('pyspark', 3)]print(rdd_text_reduce.collect())
#停止SparkContext对象的运行sContex.stop()

输出结果:

二、练习二

相关代码:

"""完成练习案例:JSON商品统计需求:1. 各个城市销售额排名,从大到小2. 全部城市,有哪些商品类别在售卖3. 北京市有哪些商品类别在售卖"""import jsonimport osfrom pyspark import SparkConf, SparkContext#####定义PySpark运行环境os.environ["PYSPARK_PYTHON"]="D:softpythonpython3104python.exe"#创建SparkConf类对象conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")#利用SparkConf构建SparkContext对象sContex=SparkContext(conf=conf)#打印pyspark的运行版本print(sContex.version)  #3.4.0#读取文件中的内容rdd_orders=sContex.textFile("D:orders.txt")# TODO 需求1:城市销售额排名# 1.1 读取文件得到RDD# 1.2 取出一个个JSON字符串rdd_orders_json=rdd_orders.flatMap(lambda x: x.split("|"))#:['{"id":1,"timestamp":"2019-05-08T01:03.00Z","category":"平板电脑","areaName":"北京","money":"1450"}'.....print(f"rdd_orders_json:{rdd_orders_json.collect()}")# 1.3 将一个个JSON字符串转换为字典rdd_orders_dict=rdd_orders_json.map(lambda x:json.loads(x))#[{'id': 1, 'timestamp': '2019-05-08T01:03.00Z', 'category': '平板电脑', 'areaName': '北京', 'money': '1450'},print(f"rdd_orders_dict:{rdd_orders_dict.collect()}")# 1.4 取出城市和销售额数据# (城市,销售额)rdd_orders_map=rdd_orders_dict.map(lambda x:(x["areaName"],int(x["money"])))#[('北京', '1450'), ('北京', '1450'), ('北京', '8412'), ('上海', '1513').....print(f"rdd_orders_map:{rdd_orders_map.collect()}")# 1.5 按城市分组按销售额聚合res_01_rdd=rdd_orders_map.reduceByKey(lambda x,y:(x+y)).sortBy(lambda x:x[1],ascending=False,numPartitions=1)# 1.6 按销售额聚合结果进行排序#城市销售额排名:[('北京', 91556), ('杭州', 28831), ('天津', 12260), ('上海', 1513), ('郑州', 1120)]print(f"城市销售额排名:{res_01_rdd.collect()}")# TODO 需求2:全部城市有哪些商品类别在售卖# 2.1 取出全部的商品类别# 2.2 对全部商品类别进行去重res_02_rdd=rdd_orders_dict.map(lambda x:x["category"]).distinct()#全部城市有哪些商品类别在售卖:['平板电脑', '家电', '书籍', '手机', '电脑', '家具', '食品', '服饰']print(f"全部城市有哪些商品类别在售卖:{res_02_rdd.collect()}")# TODO 需求3:北京市有哪些商品类别在售卖# 3.1 过滤北京市的数据beijing_orders_rdd=rdd_orders_dict.filter(lambda x:x["areaName"]=='北京')# 3.2 取出全部商品类别# 3.3 进行商品类别去重res_03_rdd=beijing_orders_rdd.map(lambda x:x["category"]).distinct()#北京市有哪些商品类别在售卖:['平板电脑', '家电', '书籍', '手机', '电脑', '家具', '食品', '服饰']print(f"北京市有哪些商品类别在售卖:{res_03_rdd.collect()}")

输出结果:

    1SpringBoot使@Async线;

    2SpringBoot线ThreadPoolTaskExecutor;

    3SpringBoot线ThreadPoolExecutor

阅读全文
下载说明:
1、本站所有资源均从互联网上收集整理而来,仅供学习交流之用,因此不包含技术服务请大家谅解!
2、本站不提供任何实质性的付费和支付资源,所有需要积分下载的资源均为网站运营赞助费用或者线下劳务费用!
3、本站所有资源仅用于学习及研究使用,您必须在下载后的24小时内删除所下载资源,切勿用于商业用途,否则由此引发的法律纠纷及连带责任本站和发布者概不承担!
4、本站站内提供的所有可下载资源,本站保证未做任何负面改动(不包含修复bug和完善功能等正面优化或二次开发),但本站不保证资源的准确性、安全性和完整性,用户下载后自行斟酌,我们以交流学习为目的,并不是所有的源码都100%无错或无bug!如有链接无法下载、失效或广告,请联系客服处理!
5、本站资源除标明原创外均来自网络整理,版权归原作者或本站特约原创作者所有,如侵犯到您的合法权益,请立即告知本站,本站将及时予与删除并致以最深的歉意!
6、如果您也有好的资源或教程,您可以投稿发布,成功分享后有站币奖励和额外收入!
7、如果您喜欢该资源,请支持官方正版资源,以得到更好的正版服务!
8、请您认真阅读上述内容,注册本站用户或下载本站资源即您同意上述内容!
原文链接:https://www.shuli.cc/?p=14780,转载请注明出处。
0

评论0

显示验证码
没有账号?注册  忘记密码?