Pyspark
UDF
Dense Vector
Data Types
Python

What Type should the dense vector be, when using UDF function in Pyspark?

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Introduction

When returning a DenseVector from a PySpark UDF, use VectorUDT() from pyspark.ml.linalg as the return type. Do not use ArrayType(DoubleType()) — it returns a plain Python list, not a vector that ML pipelines can consume. For UDFs that take vectors as input, the column is already stored as VectorUDT internally, so you access it as a DenseVector object inside the UDF. If you need better performance, consider pandas_udf with ArrayType(DoubleType()) and convert back to vectors afterward.

Returning a DenseVector from a UDF

python
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import udf
3from pyspark.ml.linalg import DenseVector, VectorUDT
4
5spark = SparkSession.builder.getOrCreate()
6
7# UDF that returns a DenseVector
8@udf(returnType=VectorUDT())
9def normalize_vector(vector):
10    values = vector.toArray()
11    norm = float(sum(v**2 for v in values) ** 0.5)
12    if norm == 0:
13        return vector
14    return DenseVector([v / norm for v in values])
15
16# Create sample data with vectors
17from pyspark.ml.linalg import Vectors
18
19df = spark.createDataFrame([
20    (1, Vectors.dense([3.0, 4.0])),
21    (2, Vectors.dense([1.0, 0.0])),
22    (3, Vectors.dense([0.0, 0.0])),
23], ["id", "features"])
24
25# Apply the UDF
26result = df.withColumn("normalized", normalize_vector("features"))
27result.show(truncate=False)
28# +---+-----------+------------------+
29# |id |features   |normalized        |
30# +---+-----------+------------------+
31# |1  |[3.0,4.0]  |[0.6,0.8]         |
32# |2  |[1.0,0.0]  |[1.0,0.0]         |
33# |3  |[0.0,0.0]  |[0.0,0.0]         |
34# +---+-----------+------------------+

Common Mistake: Using ArrayType Instead of VectorUDT

python
1from pyspark.sql.types import ArrayType, DoubleType
2
3# WRONG: Returns a plain list, not a vector
4@udf(returnType=ArrayType(DoubleType()))
5def bad_normalize(vector):
6    values = vector.toArray()
7    norm = float(sum(v**2 for v in values) ** 0.5)
8    return [v / norm for v in values]
9
10# This produces an array column, not a vector column
11# ML transformers will fail:
12# IllegalArgumentException: Column features must be of type VectorUDT

If you accidentally return an array, convert it back to a vector:

python
1from pyspark.ml.linalg import Vectors, VectorUDT
2from pyspark.sql.functions import udf
3
4@udf(returnType=VectorUDT())
5def array_to_vector(arr):
6    return Vectors.dense(arr)
7
8df = df.withColumn("features_vec", array_to_vector("features_array"))

Accepting a Vector as UDF Input

python
1from pyspark.ml.linalg import DenseVector, SparseVector, VectorUDT
2from pyspark.sql.functions import udf
3from pyspark.sql.types import DoubleType
4
5# UDF that accepts a vector and returns a scalar
6@udf(returnType=DoubleType())
7def vector_sum(vector):
8    return float(vector.toArray().sum())
9
10@udf(returnType=DoubleType())
11def vector_max(vector):
12    return float(vector.toArray().max())
13
14result = df.withColumn("sum", vector_sum("features")) \
15           .withColumn("max", vector_max("features"))
16result.show()

Inside the UDF, the vector parameter is a DenseVector or SparseVector object from pyspark.ml.linalg. Use .toArray() to convert it to a numpy array for computation.

Working with SparseVector

python
1from pyspark.ml.linalg import Vectors, VectorUDT
2from pyspark.sql.functions import udf
3
4# UDF that handles both dense and sparse vectors
5@udf(returnType=VectorUDT())
6def double_vector(vector):
7    arr = vector.toArray()  # Works for both Dense and Sparse
8    return DenseVector(arr * 2)
9
10df = spark.createDataFrame([
11    (1, Vectors.dense([1.0, 2.0, 3.0])),
12    (2, Vectors.sparse(3, {0: 1.0, 2: 3.0})),  # [1.0, 0.0, 3.0]
13], ["id", "features"])
14
15result = df.withColumn("doubled", double_vector("features"))
16result.show(truncate=False)

pandas_udf for Better Performance

python
1import pandas as pd
2import numpy as np
3from pyspark.sql.functions import pandas_udf
4from pyspark.sql.types import ArrayType, DoubleType
5from pyspark.ml.linalg import Vectors, VectorUDT
6from pyspark.sql.functions import udf
7
8# Step 1: Use pandas_udf with array type (much faster)
9@pandas_udf(ArrayType(DoubleType()))
10def normalize_pandas(vectors: pd.Series) -> pd.Series:
11    def norm(arr):
12        n = np.linalg.norm(arr)
13        return (np.array(arr) / n).tolist() if n > 0 else arr
14    return vectors.apply(norm)
15
16# Step 2: Convert array back to vector
17@udf(returnType=VectorUDT())
18def to_vector(arr):
19    return Vectors.dense(arr)
20
21# Apply
22result = df.withColumn("norm_arr", normalize_pandas("features")) \
23           .withColumn("normalized", to_vector("norm_arr")) \
24           .drop("norm_arr")

pandas_udf processes data in batches using Apache Arrow, which is significantly faster than row-at-a-time UDFs. The trade-off is that VectorUDT is not natively supported by Arrow, so you work with arrays and convert afterward.

Common Pitfalls

  • Using ArrayType(DoubleType()) as the return type instead of VectorUDT(): ML pipelines and transformers require VectorUDT columns. An ArrayType column looks similar but is rejected by estimators like LogisticRegression, KMeans, and StandardScaler with an IllegalArgumentException.
  • Importing DenseVector from pyspark.mllib.linalg instead of pyspark.ml.linalg: PySpark has two vector packages — mllib (legacy RDD-based) and ml (DataFrame-based). Mixing them causes type mismatches. Always use pyspark.ml.linalg for DataFrame UDFs.
  • Not calling .toArray() before numpy operations: DenseVector supports some numpy operations but not all. Calling .toArray() first converts it to a standard numpy array, ensuring all numpy functions work correctly.
  • Returning a numpy array instead of a DenseVector from the UDF: If the UDF return type is VectorUDT(), you must return a DenseVector object, not a numpy array. Wrap the result: return DenseVector(my_numpy_array.tolist()).
  • Using regular UDFs for large-scale vector operations: Row-at-a-time UDFs serialize each row individually via Python pickling, which is slow. For batch operations on vectors, use pandas_udf with ArrayType and convert to vectors afterward, or use built-in ML transformers when possible.

Summary

  • Use VectorUDT() from pyspark.ml.linalg as the return type for UDFs that return vectors
  • Inside UDFs, call .toArray() to convert vectors to numpy arrays for computation
  • Return DenseVector(values) from the UDF, not a plain list or numpy array
  • Use pandas_udf with ArrayType(DoubleType()) for better performance, then convert to vectors
  • Always import from pyspark.ml.linalg, not pyspark.mllib.linalg

Course illustration
Course illustration

All Rights Reserved.