Introduction
When returning a DenseVector from a PySpark UDF, use VectorUDT() from pyspark.ml.linalg as the return type. Do not use ArrayType(DoubleType()) — it returns a plain Python list, not a vector that ML pipelines can consume. For UDFs that take vectors as input, the column is already stored as VectorUDT internally, so you access it as a DenseVector object inside the UDF. If you need better performance, consider pandas_udf with ArrayType(DoubleType()) and convert back to vectors afterward.
Returning a DenseVector from a UDF
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import udf
3from pyspark.ml.linalg import DenseVector, VectorUDT
4
5spark = SparkSession.builder.getOrCreate()
6
7# UDF that returns a DenseVector
8@udf(returnType=VectorUDT())
9def normalize_vector(vector):
10 values = vector.toArray()
11 norm = float(sum(v**2 for v in values) ** 0.5)
12 if norm == 0:
13 return vector
14 return DenseVector([v / norm for v in values])
15
16# Create sample data with vectors
17from pyspark.ml.linalg import Vectors
18
19df = spark.createDataFrame([
20 (1, Vectors.dense([3.0, 4.0])),
21 (2, Vectors.dense([1.0, 0.0])),
22 (3, Vectors.dense([0.0, 0.0])),
23], ["id", "features"])
24
25# Apply the UDF
26result = df.withColumn("normalized", normalize_vector("features"))
27result.show(truncate=False)
28# +---+-----------+------------------+
29# |id |features |normalized |
30# +---+-----------+------------------+
31# |1 |[3.0,4.0] |[0.6,0.8] |
32# |2 |[1.0,0.0] |[1.0,0.0] |
33# |3 |[0.0,0.0] |[0.0,0.0] |
34# +---+-----------+------------------+
Common Mistake: Using ArrayType Instead of VectorUDT
1from pyspark.sql.types import ArrayType, DoubleType
2
3# WRONG: Returns a plain list, not a vector
4@udf(returnType=ArrayType(DoubleType()))
5def bad_normalize(vector):
6 values = vector.toArray()
7 norm = float(sum(v**2 for v in values) ** 0.5)
8 return [v / norm for v in values]
9
10# This produces an array column, not a vector column
11# ML transformers will fail:
12# IllegalArgumentException: Column features must be of type VectorUDT
If you accidentally return an array, convert it back to a vector:
1from pyspark.ml.linalg import Vectors, VectorUDT
2from pyspark.sql.functions import udf
3
4@udf(returnType=VectorUDT())
5def array_to_vector(arr):
6 return Vectors.dense(arr)
7
8df = df.withColumn("features_vec", array_to_vector("features_array"))
1from pyspark.ml.linalg import DenseVector, SparseVector, VectorUDT
2from pyspark.sql.functions import udf
3from pyspark.sql.types import DoubleType
4
5# UDF that accepts a vector and returns a scalar
6@udf(returnType=DoubleType())
7def vector_sum(vector):
8 return float(vector.toArray().sum())
9
10@udf(returnType=DoubleType())
11def vector_max(vector):
12 return float(vector.toArray().max())
13
14result = df.withColumn("sum", vector_sum("features")) \
15 .withColumn("max", vector_max("features"))
16result.show()
Inside the UDF, the vector parameter is a DenseVector or SparseVector object from pyspark.ml.linalg. Use .toArray() to convert it to a numpy array for computation.
Working with SparseVector
1from pyspark.ml.linalg import Vectors, VectorUDT
2from pyspark.sql.functions import udf
3
4# UDF that handles both dense and sparse vectors
5@udf(returnType=VectorUDT())
6def double_vector(vector):
7 arr = vector.toArray() # Works for both Dense and Sparse
8 return DenseVector(arr * 2)
9
10df = spark.createDataFrame([
11 (1, Vectors.dense([1.0, 2.0, 3.0])),
12 (2, Vectors.sparse(3, {0: 1.0, 2: 3.0})), # [1.0, 0.0, 3.0]
13], ["id", "features"])
14
15result = df.withColumn("doubled", double_vector("features"))
16result.show(truncate=False)
1import pandas as pd
2import numpy as np
3from pyspark.sql.functions import pandas_udf
4from pyspark.sql.types import ArrayType, DoubleType
5from pyspark.ml.linalg import Vectors, VectorUDT
6from pyspark.sql.functions import udf
7
8# Step 1: Use pandas_udf with array type (much faster)
9@pandas_udf(ArrayType(DoubleType()))
10def normalize_pandas(vectors: pd.Series) -> pd.Series:
11 def norm(arr):
12 n = np.linalg.norm(arr)
13 return (np.array(arr) / n).tolist() if n > 0 else arr
14 return vectors.apply(norm)
15
16# Step 2: Convert array back to vector
17@udf(returnType=VectorUDT())
18def to_vector(arr):
19 return Vectors.dense(arr)
20
21# Apply
22result = df.withColumn("norm_arr", normalize_pandas("features")) \
23 .withColumn("normalized", to_vector("norm_arr")) \
24 .drop("norm_arr")
pandas_udf processes data in batches using Apache Arrow, which is significantly faster than row-at-a-time UDFs. The trade-off is that VectorUDT is not natively supported by Arrow, so you work with arrays and convert afterward.
Common Pitfalls
Using ArrayType(DoubleType()) as the return type instead of VectorUDT(): ML pipelines and transformers require VectorUDT columns. An ArrayType column looks similar but is rejected by estimators like LogisticRegression, KMeans, and StandardScaler with an IllegalArgumentException.
Importing DenseVector from pyspark.mllib.linalg instead of pyspark.ml.linalg: PySpark has two vector packages — mllib (legacy RDD-based) and ml (DataFrame-based). Mixing them causes type mismatches. Always use pyspark.ml.linalg for DataFrame UDFs.
Not calling .toArray() before numpy operations: DenseVector supports some numpy operations but not all. Calling .toArray() first converts it to a standard numpy array, ensuring all numpy functions work correctly.
Returning a numpy array instead of a DenseVector from the UDF: If the UDF return type is VectorUDT(), you must return a DenseVector object, not a numpy array. Wrap the result: return DenseVector(my_numpy_array.tolist()).
Using regular UDFs for large-scale vector operations: Row-at-a-time UDFs serialize each row individually via Python pickling, which is slow. For batch operations on vectors, use pandas_udf with ArrayType and convert to vectors afterward, or use built-in ML transformers when possible.
Summary
Use VectorUDT() from pyspark.ml.linalg as the return type for UDFs that return vectors
Inside UDFs, call .toArray() to convert vectors to numpy arrays for computation
Return DenseVector(values) from the UDF, not a plain list or numpy array
Use pandas_udf with ArrayType(DoubleType()) for better performance, then convert to vectors
Always import from pyspark.ml.linalg, not pyspark.mllib.linalg