Spark SQL Pyspark update value in table to another value in table

I have a table that I can query with SQL. There are two columns one is called Actor1Type1 and the other is called Actor2Type1. If a cell in column Actor1Type1 is '' and Actor2Type1 is not '', then I want to change the value of that cell to the value of Actor2Type1. I have no idea how to do this with Spark SQL because I am new to it. So far I have sqlContext.registerDataFrameAsTable(df, 'temp') new_df = sqlContext.sql("""SELECT CASE WHEN temp.Actor1Type1Code == '' AND temp.Actor2Type1Code !=

Pyspark Livy return before the computation is over

Working with Livy on the Hortonworks Sandbox we found the problem that on some cases the Livy API provide a result for the statement before the actual work has ended as indicated by the Spark monitoring tool. In some cases Livy provide a result in just less than 5 seconds for a job with normal execution time of more than 10 minutes. If we repeat the same task with the same initial conditions more than once on some cases we have seen this problem happen. Any idea?

pyspark: AnalysisException when joining two data frame

I have two data frame created from sparkSQL: df1 = sqlContext.sql(""" ...""") df2 = sqlContext.sql(""" ...""") I tried to join these two data frame on the column my_id like below: from pyspark.sql.functions import col combined_df = df1.join(df2, col("df1.my_id") == col("df2.my_id"), 'inner') Then I got the following error. Any idea what I missed? Thanks! AnalysisException Traceback (most recent call last) <ipython-input-11-45f5313387cc> in <module>()

How label properly original observations with predicted clusters using kmeans in Pyspark?

I'd like to understand how the k-means method works in PySpark. For this, I've done this small example: In [120]: entry = [ [1,1,1],[2,2,2],[3,3,3],[4,4,4],[5,5,5],[5,5,5],[5,5,5],[1,1,1],[5,5,5]] In [121]: rdd_entry = sc.parallelize(entry) In [122]: clusters = KMeans.train(rdd_entry, k=5, maxIterations=10, initializationMode="random") In [123]: rdd_labels = clusters.predict(rdd_entry) In [125]: rdd_labels.collect() Out[125]: [3, 1, 0, 0, 2, 2, 2, 3, 2] In [126]: entry Out[126]: [[1, 1, 1

Bing map - How to use route api in pyspark using dataframe

I am trying to calculate the travel time using Bing Route API by passing latitude, longitude coordinates coming from dataframe columns. My code looks like this: def bing_maps(x_lat, x_long, y_lat, y_long): try: par = { 'wp.0': ''.join([x_lat, ',', x_long]), 'wp.1': ''.join([y_lat, ',', y_long]), 'avoid': 'minimizeTolls', 'key' : CMEConfig.bingKey } return requests.get(CMEConfig.bingURL, par).json()['resourceSets'][0]['resource

Pyspark VectorAssembler behavior and aggregating sparse data with dense

May someone explain behavior of VectorAssembler? from pyspark.ml.linalg import Vectors from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler( inputCols=['CategoryID', 'CountryID', 'CityID', 'tf'], outputCol="features") output = assembler.transform(tf) output.select("features").show(truncate=False) the code via show method returns me (262147,[0,1,2,57344,61006,80641,126469,142099,190228,219556,221426,231784],[2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])

Pyspark: Adding new column has the sum of rows values for more than 255 column

I need to find the sum of row values for around 900 column I applied the function in this link Spark - Sum of row values from functools import reduce def superSum(*cols): return reduce(lambda a, b: a + b, cols) add = udf(superSum) df.withColumn('total', add(*[df[x] for x in df.columns])).show() but i got this error Py4JJavaError: An error occurred while calling o1005.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, m

PySpark : Dataframe : Numeric + Null column values resulting in NULL instead of numeric value

I am facing a problem in PySpark Dataframe loaded from a CSV file , where my numeric column do have empty values Like below +-------------+------------+-----------+-----------+ | Player_Name|Test_Matches|ODI_Matches|T20_Matches| +-------------+------------+-----------+-----------+ | Aaron, V R| 9| 9| | | Abid Ali, S| 29| 5| | |Adhikari, H R| 21| | | | Agarkar, A B| 26| 191| 4| +--

Pyspark Memory Errors with Iterative Spark DataFrame Creation

I am converting raw records that arrive to me in a single zlib compressed files into enriched parquet records for later processing in Spark. I don't control the zlib file, and I need the parquet consistent with other processing. I'm working in Pyspark and Spark 2.3. My approach works except when the zlib file is reasonably large (~300MB). It holds in memory fine, but Spark is running out of memory. If i shove my driver memory up (8g), it works. It feels like a memory leak from using function cal

Pyspark multiple when condition and multiple operation

I am working on some data, where I need to run multiple conditions and if those conditions match then I want to calculate values to a new column in pyspark. I appreciate if any of you can help me in this regard. block2.withColumn("Duration", when((col("START_TS")== col("REP_WORK_DAY_TS")) & ((col("END_TS")== col("REP_WORK_DAY_TS")),(unix_timestamp("END_TIME") - unix_timestamp("START_TIME"))))).show(5) For example in the above code, I am applying two conditions and then I want to calculate

Pyspark SparkSQL (Databricks): Insert data into Snowflake Table, created by different role

I have a table MYSCHEMA.TEST_SNOWFLAKE_ROLE_T in Snowflake created using the role CONSOLE_USER. MYSCHEMA has a FUTURE GRANTS associated with it, which grants the following privileges to the role BATCH_USER for any table created under the schema MYSCHEMA - DELETE, INSERT, REFERENCES, SELECT, TRUNCATE, UPDATE. The role BATCH_USER also has CREATE STAGE and USAGE privileges on the schema MYSCHEMA. A second user belonging to the role BATCH_USER tries to insert data into the same table from a da

DOB field in Pyspark

I want to calculate age and from DOB field. But in my code I am hard coding it. But need to do dynamically like today - DOB. Similarly I also want to calculate duration from start_date. My data frame looks like - id dob start_date 77 30/09/1990 2019-04-13 15:27:22 65 15/12/1988 2018-12-26 23:28:12 3 08/12/2000 2018-12-26 23:28

AWS SageMaker notebook list tables using boto3 and PySpark

Having some difficulty executing the following code in AWS SageMaker. It is supposed to just list all of the tables in DynamoDB. import boto3 resource = boto3.resource('dynamodb', region_name='xxxx') response = resource.tables.all() for r in response: print(r.name) If the SageMaker notebook kernel is set to "conda_python3" the code executes fine and the tables are listed out in the notebook as expected (this happens pretty much instantly). However, if I set the kernel to "Sparkmagic (Py

Pyspark Schema - Definition and Infer at the same time

How to define schema for some columns and infer schema for the rest? Is there a way to have partial schema definitions? Say I have columns a, b, c; I want to define schema for just a and infer for b and c. Additionally, I'm defining schema while reading.

Pyspark Py4JJavaError: An error occurred while calling o27.sessionState

I am trying to use spark in jupyter notebook. Here is my code and errors. Thanks. spark = SparkSession.builder \ .master("local") \ .appName("Image Retrieval") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() Error log: Py4JJavaError: An error occurred while calling o27.sessionState. : java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.parser.AbstractSqlParser.(Lorg/apache/spark/sql/internal/SQLConf;)V at org.apache.spark.sql.exe

Remove rows with a certain value in any column in pyspark

I am working in pyspark to clean a data set. The data set has "?" in various rows in various columns. I want to remove any row that has the value anywhere in it. I tried the following: df = df.replace("?", "np.Nan") df=df.dropna() However, It did not work to remove those values. I keep looking online but can't find any understandable answers (i am a newbie)

pyspark timestamp with timezone

I am trying to extract out a value from a table using pyspark and I need the value in this format: 2020-06-17T15:08:24z df = spark.sql('select max(lastModDt)as lastModDate from db.tbl') jobMetadata = existingMaxModifiedDate.withColumn("maxDate", date_format(to_timestamp(existingMaxModifiedDate.lastModDate, "yyyy-mm-dd HH:MM:SS.SSS"), "yyyy-mm-dd HH:MM:SS.SSS")) However, I keep getting null for created column "maxDate". Thank you.

Reading a tsv file in pyspark

I want to read a tsv file but it has no header I am creating my own schema nad then trying to read TSV file but after applyting schema it is showing all columns values as null.Below is my code and result. from pyspark.sql.types import StructType,StructField,StringType,IntegerType schema = StructType([StructField("id_code", IntegerType()),StructField("description", StringType())]) df=spark.read.csv("C:/Users/HP/Downloads/`connection_type`.tsv",schema=schema) df.show(

How to track progress of a long job in Amazon EMR with pyspark?

My code comprises of one long job: from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("PythonPi")\ .getOrCreate() sc = spark.sparkContext from time import sleep import os def f(_): sleep(1.0) print("executor running") # <= I can find it in the log, but only after the job ended with open(os.path.expanduser("~/output.txt"), "w") as f: # <= can not find this file on master node f.write("exec

Weighted average calculation in pyspark when multiple columns contain null

I have a spark dataframe which is of the following form: I want to calculate an weighted overall score which will give some x1, x2, x3 weightage to task, process and functions respectively. The formula for the score would be ( len([common_i]/(len[Name_i]+len[Ref_Name_i]-len[common_i])). where i stands for task/process/functions. However, for the cases where [Name_i] and [Ref_Name_i] both are null, I want to put the denominator for weighted average as (1- xi) instead of 1. Can anyone help me on

Pyspark SparkSQL ,Spark DataFrame : batch rename csv headers

I'm trying to change headers of a very large csv file . I use SparkSQL All headers have some_string in each header name , like some_string.header_name My Spark configuration conf = SparkConf().setMaster("local[*]").setAppName("readCSV") To read csv file I use com.databricks.spark.csv package logs_df = sqlContext.load( source = "com.databricks.spark.csv", header = 'true', inferSchema ='true', path = 'my_file.csv' ) my code header = logs_df.first() schemaString = header

Pyspark Join two data frames, select all columns from one and some columns from the other

Let's say I have a spark data frame df1, with several columns (among which the column id) and data frame df2 with two columns, id and other. Is there a way to replicate the following command sqlContext.sql("SELECT df1.*, df2.other FROM df1 JOIN df2 ON df1.id = df2.id") by using only pyspark functions such as join(), select() and the like? I have to implement this join in a function and I don't want to be forced to have sqlContext as a function parameter. Thanks!

Pyspark Extract byte from Spark BinaryType

I have a table with a binary column of type BinaryType: >>> df.show(3) +--------+--------------------+ | t| bytes| +--------+--------------------+ |0.145533|[10 50 04 89 00 3...| |0.345572|[60 94 05 89 80 9...| |0.545574|[99 50 68 89 00 7...| +--------+--------------------+ only showing top 3 rows >>> df.schema StructType(List(StructField(t,DoubleType,true),StructField(bytes,BinaryType,true))) If I extract the first byte of the binary, I get an exception

Programmatically specifying the schema in PySpark

I'm trying to create a dataframe from an rdd. I want to specify schema explicitly. Below is the code snippet which I tried. from pyspark.sql.types import StructField, StructType , LongType, StringType stringJsonRdd_new = sc.parallelize(('{"id": "123", "name": "Katie", "age": 19, "eyeColor": "brown" }',\ '{ "id": "234","name": "Michael", "age": 22, "eyeColor": "green" }',\ '{ "id": "345", "name": "Simone", "age": 23, "eyeColor": "blue" }')) mySchema = StructType([StructField("id", LongType

Pyspark groupBy Pivot Transformation

I'm having a hard time framing the following Pyspark dataframe manipulation. Essentially I am trying to group by category and then pivot/unmelt the subcategories and add new columns. I've tried a number of ways, but they are very slow and and are not leveraging Spark's parallelism. Here is my existing (slow, verbose) code: from pyspark.sql.functions import lit df = sqlContext.table('Table') #loop over category listids = [x.asDict().values()[0] for x in df.select("category").distinct().c

How to ignore headers in PySpark when using Athena and AWS Glue Data Catalog

Assume I have a CSV file like this: "Col1Name", "Col2Name" "a", "b" "c", "d" Assume I issue the following CREATE EXTERNAL TABLE command in Athena: CREATE EXTERNAL TABLE test.sometable ( col1name string, col2name string ) row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde' with serdeproperties ( 'separatorChar' = ',', 'quoteChar' = '\"', 'escapeChar' = '\\' ) stored as textfile location 's3://somebucket/some/path/' tblproperties("skip.header.line.count"="1") Then

Using KuduContext in pyspark

I would like to use kudu with pyspark. While I can use it with: sc.read.format('org.apache.kudu.spark.kudu').option('kudu.master',"hdp1:7051").option('kudu.table',"impala::test.z_kudu_tab").load() I cannot find a way to import KuduContext. I'm working in a jupyter notebook, and importing it with: os.environ["PYSPARK_SUBMIT_ARGS"] = "--driver-memory 2g --packages com.ibm.spss.hive.serde2.xml:hivexmlserde:1.0.5.3 --packages org.apache.kudu:kudu-spark2_2.11:1.7.0 pyspark-shell" My not worki

Pyspark ImportError: cannot import name 'Pregel' from 'graphframes.lib'

I am using pyspark and graphframes from jupyter. I am able to successfully import pyspark and graphframes, but when I try: from graphframes.lib import Pregel I get the following error: ImportError: cannot import name 'Pregel' from 'graphframes.lib' This post is how I was able to get graphframes to work, but without graphframes.lib: https://github.com/graphframes/graphframes/issues/104 wget https://github.com/graphframes/graphframes/archive/release-0.2.0.zip unzip release-0.2.0.zip cd g

Pyspark impute missing values in time series

I am using Pyspark to analyse some time series data. My data looks like this: Key | time | value -------------------- A | t0 | null A | t1 | 1.5 A | t2 | 1.7 B | t3 | 0.5 B | t4 | null B | t5 | 1.1 C | t6 | 4.3 C | t7 | 3.4 C | t8 | null C | t9 | 2.7 It's safe to assume the relationship between "time" and "value" is approximately linear. I want to interpolate the null values by training a linear regression from the rema

Dynamic boolean join in pyspark

I have two pyspark dataframe with same schema as below - df_source: id, name, age df_target: id,name,age "id" is primary column in both the tables and rest are attribute columns i am accepting primary and attribute column list from user as below- primary_columns = ["id"] attribute_columns = ["name","age"] I need to join above two dataframes dynamically as below - df_update = df_source.join(df_target, (df_source["id"] == df_target["id"]) & ((df_source["name"] != df_target["name"

pyspark: evaluate the sum of all elements in a dataframe

I am trying to evaluate, in pyspark, the sum of all elements of a dataframe. I wrote the following function def sum_all_elements(df): df = df.groupBy().sum() df = df.withColumn('total', sum(df[colname] for colname in df.columns)) return df.select('total').collect()[0][0] To speed up the function I have tried to convert to rdd and sum as def sum_all_elements_pyspark(df): res = df.rdd.map(lambda x: sum(x)).sum() return res But apparently the rdd function is slower than th

Pyspark Glue Job Writes Multiple Partitions to the Same File

I'm trying to write a glue job that converts multiples csv files to separate json files using each row of the csv for a file. When the job finishes, the correct number of files show up in s3, but some are empty and some have multiple json objects in the same file. After I apply the mapping, this is how I create the partitions and write the files: numEntities = applyMapping1.toDF().count() partitions = applymapping1.repartition(numEntities) partitions.toDF().write.mode("ignore").format(

how to detect if your code is running under pyspark

For staging and production, my code will be running on PySpark. However, in my local development environment, I will not be running my code on PySpark. This presents a problem from the standpoint of logging. Because one uses the Java library Log4J via Py4J when using PySpark, one will not be using Log4J for the local development. Thankfully, the API for Log4J and the core Python logging module are the same: once you get a logger object, with either module you simply debug() or info() etc. Thus,

Pyspark IF and ELSE statement in spark sql expression

I am looking to run a sql expression that checks for the next event that is either 'DELIVERED' or 'ORDER-CANCELED' and return a different result depending on which is first. df = spark.createDataFrame([["ORDER", "2009-11-23", "1"], ["DELIVERED", "2009-12-17", "1"], ["ORDER-CANCELED", "2009-11-25", "1"], ["ORDER", "2009-12-03", "1"]]).toDF("EVENT", "DATE", &q

Pyspark Spark aggregation / group by so as to determine a new column's value based on col value in a set

I have some data that will be grouped by id. id, field 0 A 0 B 0 C 1 B 1 B 1 C 2 E I want to group by ID and calculate a simple new value, is_special, which is group by id, if any(field) is in a special set {A, E} (just a random set of letters, no pattern). id, is_special 0 True 1 False 2 True Something like this question but in pyspark. I want to understand how to do this group by without actually grouping, and just create

Pyspark Dataframes Resolved attribute(s) error with no matching column names

I have a dataframe graphcounts with a hero Id and connections as below +------+-----------+ |heroId|connections| +------+-----------+ | 691| 7| | 1159| 12| | 3959| 143| | 1572| 36| | 2294| 15| | 1090| 5| | 3606| 172| | 3414| 8| | 296| 18| | 4821| 17| | 2162| 42| | 1436| 10| | 1512| 12| I have another dataframe graph_names with hero id and names as below. +---+-----------------

PySpark loop in groupBy aggregate function

I have a big table for which I m trying to calculate sums (with conditions) of some columns grouping by a location. My code looks like this, and I have more and more columns df.groupBy(location_column).agg( F.sum(F.when(F.col(col1) == True, F.col(value))).alias("SUM " + col1), F.sum(F.when(F.col(col2) == True, F.col(value))).alias("SUM " + col2), F.sum(F.when(F.col(col3) == True, F.col(value))).alias("SUM " + col3), .... # Add

Pyspark load-csv does not show the real schema of a new file (only the "infered" schema)

I'm trying to load with pyspark csv from a partitionned folder : mnt/data/test/ingestdatetime=20210208/test_20210208.csv df = spark.read.csv("mnt/data/test") df = df.filter(df['ingestdatetime'] == '20210208') Basically I want to see if the schema is different from what it is supposed to be (the data does not come with headers, so i can not compare headers) The issue is that, whenever I’m loading the data on top level "data/test/", the schema is “inferred” based on few rows,

Split column of list into multiple columns in the same PySpark dataframe

I have the following dataframe which contains 2 columns: 1st column has column names 2nd Column has list of values. +--------------------+--------------------+ | Column| Quantile| +--------------------+--------------------+ | rent|[4000.0, 4500.0, ...| | is_rent_changed|[0.0, 0.0, 0.0, 0...| | phone|[7.022372888E9, 7...| | Area_house|[1000.0, 1000.0, ...| | bedroom_count|[1.0, 1.0, 1.0, 1...| | bathroom_count|[1.0, 1

Pyspark Rename Column in Athena

Athena table "organization" reads data from parquet files in s3. I need to change a column name from "cost" to "fee" . The data files goes back to Jan 2018. If I just rename the column in Athena , table won't be able to find data for new column in parquet file. Please let me know if there ways to resolve it.

Pyspark - Merge consecutive duplicate rows but maintain start and end dates

I have a dataframe with the following format... id , name, start_date, end_date , active 1 , albert , 2019-08-14, 3499-12-31, 1 1 , albert , 2019-08-13, 2019-08-14, 0 1 , albert , 2019-06-26, 2019-08-13, 0 1 , brian , 2018-01-17, 2019-06-26, 0 1 , brian , 2017-07-31, 2018-01-17, 0 1 , albert , 2017-03-31, 2018-07-31, 0 2 , diane , 2019-07-14, 3499-12-31, 1 2 , diane , 2019-06-13, 2019-07-14, 0 2 , ethel , 2019-03-20, 2019-06-13, 0 2 , ethel , 2018-01-17, 2019-03-20

Why won't the exp function work in pyspark?

I'm trying to calculate odds ratios from the coefficients of a logistic regression but I'm encountering a problem best summed up by this code: import pyspark.sql.functions as F F.exp(1.2) This fails with py4j.Py4JException: Method exp([class java.lang.Double]) does not exist An integer fails similarly. I don't get how a Double can cause a problem for the exp function?

multiply pyspark array column by a scalar

I am trying to multiply an array typed column by a scalar. This scalar is also a value from same pyspark dataframe. for example, i have such a dataframe: df = sc.parallelize([([1, 2],3)]).toDF(["l","factor"]) +------+------+ | l|factor| +------+------+ |[1, 2]| 3| +------+------+ What I want to achieve is this: +------+------+ | l|factor| +------+------+ |[3, 6]| 3| +------+------+ This is what i have tried: df.withColumn("l", lit("factor"

pyspark-strange behavior of count function inside agg

I am using spark 2.4.0 I am observing a strange behavior while using count function to aggregate. from pyspark.sql import functions as F tst=sqlContext.createDataFrame([(1,2),(1,5),(2,None),(2,3),(3,None),(3,None)],schema=['col1','col2']) tst.show() +----+----+ |col1|col2| +----+----+ | 1| 2| | 1| 5| | 2|null| | 2| 3| | 3|null| | 3|null| +----+----+ tst.groupby('col1').agg(F.count('col2')).show() +----+-----------+ |col1|count(col2)| +----+-----------+ | 1| 2| | 3

SCD2 Implementation in Redshift using AWS GLue Pyspark

I have a requirement to move data from S3 to Redshift. Currently I am using Glue for the work. Current Requirement: Compare the primary key of record in redshift table with the incoming file, if a match is found close the old record's end date (update it from high date to current date) and insert the new one. If primary key match is not found then insert the new record. Implementation: I have implemented it in Glue using pyspark with the following steps: Created dataframes which will cover thre

How to convert JSON key-value pairs to records in pyspark?

I have a series of JSONs that look something like this: { id: '121', values: { "Value A": 1, "Value B": 2, "Value C": 3 } } The number of key-value pairs can vary. I want to create a pyspark program that would take this and break this down into a DataFrame that looks something like this: id | key | value ____ _________ ______ 121 | Value A | 1 121 | Value B | 2 121 | Value C | 3 I was able to get the id and value columns

Create a column in pyspark that refers upon itself after the first row

I want to create a column in pyspark that refers upon itself after the first row. Customer | Week | Price | Index change | Column to be created A 1 10 0.5 10 A 2 13 0.1 10* (1+0.1) = 11 A 3 16 0.6 11* (1+0.6) = 17.6 A 4 16 0.1 17.6 * (1+0.1) = 19.36 There are multiple customers in this dataset, each with 52 weeks. I know I have to use a window function, but I am havi

Pyspark groupBy: Get minimum value for column but retrieve value from different column of same row

I'm trying to group my data in PySpark - I have data from cars travelling around a track. I want to group on race id, car, driver etc - but for each group I want to take the first and last recorded times - which I have done below. I also want to take the tyre pressure from the first recorded row. I have tried to do the below but I'm getting the error: "...due to data type mismatch: WHEN expressions in CaseWhen should all be boolean type" Will be grateful for any suggestions! Thanks Raw

  1    2   3   4   5   6  ... 下一页 最后一页 共 12 页