转:https://blog.csdn.net/weimingyu945/article/details/77981884
-------------------------------------------------------------------------------------------------------
基本操作:
| sparksn = SparkSession.builder.appName("PythonSQL").getOrCreate() print sparksn.version | 
| pandas_df = spark_df.toPandas() | spark_df = sqlContext.createDataFrame(pandas_df) | 
| rdd_df = df.rdd | df = rdd_df.toDF() | 
| df.withColumn(“xx”, 0).show() 会报错,因为原来没有xx列 from pyspark.sql import functions df = df.withColumn(“xx”, functions.lit(0)).show() | 
| df.na.fill() | 
| df = df.withColumn(‘count20‘, df["count"] - 20) # 新列为原有列的数据减去20 | 
| df.drop(‘age‘).collect() df.drop(df.age).collect() | 
| df = df.na.drop()  # 扔掉任何列包含na的行 df = df.dropna(subset=[‘col_name1‘, ‘col_name2‘]) # 扔掉col1或col2中任一一列包含na的行 | 
| df = df.withColumn(“xx”, 1) | 
| df = df.withColumn("year2", df["year1"].cast("Int")) | 
| df_join = df_left.join(df_right, df_left.key == df_right.key, "inner") | 
| GroupedData = df.groupBy(“age”) 应用单个函数(按照A列同名的进行分组,组内对B列进行均值计算来合并): df.groupBy(“A”).avg(“B”).show() 应用多个函数: from pyspark.sql import functions df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show() | 
| df.foreach(f) 或者 df.rdd.foreach(f) | 
| df.foreachPartition(f) 或者 df.rdd.foreachPartition(f) | 
| df.map(func) df.reduce(func) | 
| @staticmethod     def map_convert_none_to_str(row):         dict_row = row.asDict()         for key in dict_row:             if key != ‘some_column_name‘:                 value = dict_row[key]                 if value is None:                     value_in = str("")                 else:                     value_in = str(value)                 dict_row[key] = value_in         columns = dict_row.keys()         v = dict_row.values()         row = Row(*columns)         return row(*v) | 
| df.show() df.show(30) | 
| df.printSchema() | 
| list = df.head(3)   # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...] list = df.take(5) # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...] | 
| list = df.collect() | 
| int_num = df.count() | 
| from pyspark.sql.functions import isnull df = df.filter(isnull("col_a")) | 
| r = Row(age=11, name=‘Alice‘) print r.__fields__ # [‘age‘, ‘name‘] | 
| df.select(“name”) df.select(df[‘name’], df[‘age’]+1) df.select(df.a, df.b, df.c)    # 选择a、b、c三列 df.select(df["a"], df["b"], df["c"])    # 选择a、b、c三列 | 
| df = df.sort("age", ascending=False) | 
| df = df.filter(df[‘age‘]>21) df = df.where(df[‘age‘]>21) # 对null或nan数据进行过滤: from pyspark.sql.functions import isnan, isnull df = df.filter(isnull("a")) # 把a列里面数据为null的筛选出来(代表python的None类型) df = df.filter(isnan("a")) # 把a列里面数据为nan的筛选出来(Not a Number,非数字数据) | 
| df.createOrReplaceTempView("TBL1") | 
| conf = SparkConf() ss = SparkSession.builder.appName("APP_NAME").config(conf=conf).getOrCreate() df = ss.sql(“SELECT name, age FROM TBL1 WHERE age >= 13 AND age <= 19″) | 
| from pyspark.sql.functions import window win_monday = window("col1", "1 week", startTime="4 day") GroupedData = df.groupBy([df.col2, df.col3, df.col4, win_monday]) | 
| mysql> SELECT     MIN(yearD),     MAX(yearD) AS max_year,     Carrier,     COUNT(*) AS cnt,     SUM(IF(ArrDelayMinutes > 30, 1, 0)) AS flights_delayed,     ROUND(SUM(IF(ArrDelayMinutes > 30, 1, 0)) / COUNT(*),2) AS rate FROM     ontime_part WHERE     DayOfWeek NOT IN (6 , 7)         AND OriginState NOT IN (‘AK‘ , ‘HI‘, ‘PR‘, ‘VI‘)         AND DestState NOT IN (‘AK‘ , ‘HI‘, ‘PR‘, ‘VI‘) GROUP BY carrier HAVING cnt > 1000 AND max_year > ‘1990‘ ORDER BY rate DESC , cnt DESC LIMIT 10; | 
| scala> val jdbcDF = spark.read.format("jdbc").options(Map("url" ->  "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql",                                                    "dbtable" -> "ontime.ontime_sm",                                                         "fetchSize" -> "10000",                                                    "partitionColumn" -> "yeard",                                                    "lowerBound" -> "1988",                                                    "upperBound" -> "2015",                                                    "numPartitions" -> "48")).load() jdbcDF.createOrReplaceTempView("ontime") val sqlDF = sql("SELECT                      MIN(yearD),                      MAX(yearD) AS max_year,                      Carrier,                      COUNT(*) AS cnt,                      SUM(IF(ArrDelayMinutes > 30, 1, 0)) AS flights_delayed,                      ROUND(SUM(IF(ArrDelayMinutes > 30, 1, 0)) / COUNT(*),2) AS rate                  FROM                      ontime_part                  WHERE                      DayOfWeek NOT IN (6 , 7)                          AND OriginState NOT IN (‘AK‘ , ‘HI‘, ‘PR‘, ‘VI‘)                          AND DestState NOT IN (‘AK‘ , ‘HI‘, ‘PR‘, ‘VI‘)                  GROUP BY carrier                  HAVING cnt > 1000 AND max_year > ‘1990‘                  ORDER BY rate DESC , cnt DESC                  LIMIT 10; ") sqlDF.show() | 
原文:https://www.cnblogs.com/juan-F/p/11347541.html