I’m trying to run a transformation function in a pyspark script:
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "dev", table_name = "test_csv", transformation_ctx = "datasource0") ... dataframe = datasource0.toDF() ... def to_long(df, by): # Filter dtypes and split into column names and type description cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by)) # Spark SQL supports only homogeneous columns assert len(set(dtypes)) == 1, "All columns have to be of the same type" # Create and explode an array of (column_name, column_value) structs kvs = explode(array([ struct(lit(c).alias("key"), col(c).alias("val")) for c in cols ])).alias("kvs") return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"]) #to_long(df, ["A"]) .... df = to_long(dataframe, ["Name","Type"])
My dataset looks like this:
Name |01/01(FRI)|01/02(SAT)| ALZA CZ| 0 | 0 CLPA CZ| 1 | 5
My desired output is something like this:
Name |Type | Date. |Value | ALZA CZ|New | 01/01(FRI) | 0 CLPA CZ|New | 01/01(FRI) | 1 ALZA CZ|Old | 01/02(SAT) | 1 CLPA CZ|Old | 01/02(SAT) | 5
However, the last code line gives me an error similar to this:
AnalysisException: Cannot resolve 'Name' given input columns 'col10'
When I check:
df.show()
I see ‘col1’, ‘col2’ etc in the first row instead of the actual labels ( [“Name”,”Type”] ). Should I separately remove and then add the original column titles?
Advertisement
Answer
It seems like that your meta data table was configured using the built-in CSV classifier. If this classifier isn’t able to detect a header, it will call the columns col1, col2 etc.
Your problem lies one stage before your ETL job, so in my opinion you shouldn’t remove and re-add the original column titles, but fix your data import / schema detection, by using a custom classifier.