41,016 questions
3
votes
0
answers
56
views
Spark-Redis write loses rows when writing large DataFrame to Redis
I’m experiencing data loss when writing a large DataFrame to Redis using the Spark-Redis connector.
Details:
I have a DataFrame with millions of rows.
Writing to Redis works correctly for small ...
0
votes
0
answers
35
views
PySpark 3.5.5 CharType in read.csv schema definition
I'm using a PySpark notebook inside of Azure Synapse.
This is my schema definition
qcew_schema = StructType([
StructField( 'area_fips', dataType = CharType(5), ...
1
vote
0
answers
51
views
PySpark/MongoDB Connector DataException: dataType 'struct' is invalid for 'BsonArray' during ETL
I am running a data ingestion ETL pipeline orchestrated by Airflow using PySpark to read data from MongoDB (using the MongoDB Spark Connector) and load it into a Delta Lake table. The pipeline is ...
0
votes
0
answers
15
views
Can I update fs.s3a credentials in hadoop config on existing executors?
I have an application using EKS in AWS that runs a spark session that can run multiple workloads. In each workload, I need to access data from S3 in another AWS account, for which I have STS ...
Advice
0
votes
2
replies
65
views
Address Standardization using Databricks Pyspark
I am trying to convert a DataStage code into Pyspark. In existing DataStage code, Standardize stage is used to standardize US Address, US Area and US Name. I want to replicate the same logic into ...
0
votes
1
answer
61
views
How to do bucket logic in partition for Iceberg Table using AWS Glue?
# =====================================================
# 🧊 Step 4. Write Data to Iceberg Table (Glue Catalog)
# =====================================================
table_name = "glue_catalog....
3
votes
1
answer
146
views
Why does appending data with PySpark raise a "SQLServerException: CREATE TABLE permission denied" exception? [closed]
In my Databricks cluster I'm trying to write a DataFrame to my table with the following code:
df.write.jdbc(url=JDBCURL, table=table_name, mode="append")
And this line fails with
...
2
votes
0
answers
98
views
Loading a large multiline CSV file using pyspark is extremely slow
I've got a multiline CSV file which is about 150GB and I've been trying to load it using the usual code e.g.
df = spark.read.format('csv').option('header', True).option('multiLine', True).load('path/...
0
votes
0
answers
85
views
Spark OutOfMemoryError when reading large JSON file (3.5GB) as wholeText due to colon in path
I’m trying to load JSON data into an Iceberg table. The source files are named with timestamps that include colons (:), so I need to read them as plain text first. Additionally, each file is in a ...
0
votes
0
answers
66
views
Unexpected Write Behavior when using MERGE INTO/INSERT INTO Iceberg Spark Queries
I am observing different write behaviors when executing queries on EMR Notebook (correct behavior) vs when using spark-submit to submit a spark application to EMR Cluster (incorrect behavior).
When I ...
1
vote
1
answer
82
views
Apache Spark TransformWithState operator not working as expected
Hi I'm trying to implement a stateprocessor for my custom logic., ideally we are streaming and I want the custom logic of calculating packet loss from a previous row.
i implemented the stateprocessor ...
1
vote
1
answer
247
views
Unable to import pyspark.pipelines module
What could be a cause of the following error of my code in a Databricks notebook, and how can we fix the error?
ImportError: cannot import name 'pipelines' from 'pyspark' (/databricks/python/lib/...
0
votes
0
answers
44
views
Spark: VSAM File read issue with special character
We have a scenario to read a VSAM file directly along with a copy book to understand the column lengths, we were using COBRIX library as part of spark read.
However, we could the same is not properly ...
0
votes
0
answers
45
views
PySpark Create Dataframe with List Columns [duplicate]
Why does the following code produce the desired dataframe without issue
data = [
("James,,Smith",["Java","Scala","C++"],["Spark","Java"],&...
2
votes
2
answers
102
views
Union of tiny dataframes exhausts resource on Databricks
As part of a function I create df1 and df2 and aim to stack them and output the results. But the results do not display within the function, nor if I output the results and display after.
results = ...
0
votes
0
answers
74
views
Error when reading .csv.gz files in databricks
I have a small files with.csv.gz compressed format in gcs bucket and have mounted it and created external volumes on top of it in databricks(unity catalog enabled). So when I try to read a file with ...
0
votes
0
answers
94
views
High memory swap in spark streaming
I am relatively new to spark streaming but really experienced in normal batch processing.
I grab data from eventhub in azure using kafka connector.
Cluster: Standard_Ds3_v2 with 16GB RAM 4 cores.
It ...
0
votes
0
answers
99
views
Pyspark error py4j.protocol.Py4JJavaError
I keep running into this issue when running PySpark.
I was able to connect to my database and retrieve data, but whenever I try do operations like .show() or .count(), or when I try to save a Spark ...
0
votes
2
answers
79
views
Running a notebook through Spark Connect fails to operate with Kafka
I have created this Docker Compose file:
# Command: docker stack deploy streaming-stack --compose-file docker/spark-kstreams-stack.yml
# Gary A. Stafford (2022-09-14)
# Updated: 2022-12-28
version: &...
0
votes
1
answer
102
views
pyspark on Windows - unexpected termination during collect()
I am new to python and pyspark.
I'm trying to run it on Windows Server 2022. I have environment variables
HADOOP_HOME=C:\spark\hadoop
JAVA_HOME=C:\Program Files\Microsoft\jdk-17.0.16.8-hotspot
...
1
vote
1
answer
67
views
Pyspark - Resolve nested json file into multiple columns using inbuilt spark functions
I want to parse a JSON request and create multiple columns out of it in pyspark as follows:
{
"ID": "abc123",
"device": "mobile",
"Ads": [
{
...
0
votes
1
answer
113
views
Is there a way to eagerly access the datatype of a Column object in (Py)Spark?
Given an arbitrary pyspark.sql.column.Column object (or, similarly, a pyspark.sql.connect.column.Column object), is there a way to get a datatype back -- either as a DDL string or pyspark.sql.types....
0
votes
0
answers
77
views
How to merge small parquet files in Hudi into larger files
I use Spark+ Hudi to write data into S3. I was writing data in bulk_insert mode, which cause there be many small paruqet files in Hudi table.
Then I try to schedule clustering on the Hudi table:
...
2
votes
1
answer
148
views
PySpark persist() placement for a frequently mutating df
Persist() is helpful when the same dataframe is used repeatedly in the code.
But what about cases where transformations are on top of each other?:
a = spark.createDataFrame(data)
trigger action on a
...
0
votes
0
answers
51
views
Job overrun / OOM after changing the partition
I have a PySpark job that ingests data into a Delta table originally partitioned by year, month, day and hour. The job takes 2hr to complete. The job runs daily ingesting previous days full data. ...