Apache Flink
Variable Sharing
Logging
Programming
Data Processing

How to share variables & logging in Apache Flink?

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Designed to run in all common cluster environments, Flink orchestrates compute, state, and memory processes. Central to its architecture, variables and logging are crucial for any Flink deployment to monitor and optimize applications effectively. In this discussion, we delve deep into methods of sharing variables and implementing logging within Apache Flink.

Due to the distributed nature of Flink, sharing variables is not as straightforward as in single-threaded applications. Variable sharing in Flink mainly involves understanding two contexts: operator state and broadcast state.

Operator State

This is a state that is tied to one specific parallel instance of an operator. The state retains data across stream processing iterations for that specific instance, effectively allowing variable sharing only within the boundaries of that instance or its subtasks. Here’s an example:

java
1public class StatefulMapFunction extends RichMapFunction<String, String> {
2    private transient ValueState<Integer> countState;
3
4    @Override
5    public void open(Configuration parameters) throws Exception {
6        StateTtlConfig ttlConfig = StateTtlConfig
7            .newBuilder(Time.seconds(1))
8            .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
9            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
10            .build();
11    
12        ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>(
13            "text-count", // the state name
14            Types.INT); // type information
15        descriptor.enableTimeToLive(ttlConfig);
16        countState = getRuntimeContext().getState(descriptor);
17    }
18
19    @Override
20    public String map(String value) throws Exception {
21        Integer currCount = countState.value();
22        if (currCount == null) {
23            currCount = 0;
24        }
25        countState.update(currCount + 1);
26        return "Count of " + value + " : " + currCount;
27    }
28}

Broadcast State

A broadcast state allows for sharing state across all parallel instances of an operation. It’s designed to deal with use cases where some data is necessary as a contextual prerequisite for processing each incoming data element. The usage is ideal when you want to share some configuration settings or specific parameters across all tasks.

java
1public void processBroadcastElement(Tuple2<String, String> value,
2                                    Context ctx,
3                                    Collector<String> out) throws Exception {
4    ctx.getBroadcastState(mapStateDescriptor).put(value.f0, value.f1);
5}

Logging is essential for debugging and monitoring Flink applications. Flink uses SLF4J as its logging facade, allowing the flexibility to use any logging framework that supports SLF4J, such as Log4j2, Logback, or the JDK Logger.

To configure logging in Flink, modify the log4j.properties or logback.xml file typically located in the conf directory of your Flink installation. Here’s an example of a log4j.properties setup:

properties
1log4j.rootLogger=INFO, file
2log4j.appender.file=org.apache.log4j.FileAppender
3log4j.appender.file.File=flink.log
4log4j.appender.file.layout=org.apache.log4j.PatternLayout
5log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

This example sets the logger to capture INFO level logs and above, outputting to a file named flink.log.

Summary Table

FeatureDescriptionExample Use-Cases
Operator StateState scoped to specific sub-tasks of an operatorCounting elements, aggregating user behavior
Broadcast StateState that is shared across all parallel instances of an operatorSharing rule sets or configurations
LoggingProvides monitoring and debugging capabilities connected through the SLF4J logging facadeTracking application flow, error handling

Additional Considerations

  • Initialization of State: Ensure state is initialized in the open() method of Rich functions.
  • State Backends: Choose appropriate state backends (e.g., RocksDB, FsStateBackend) based on your application's performance and durability requirements.
  • Logging Level: Adjust logging levels based on the deployment cycle—debug levels during development and lower levels in production.

By mastering shared variable handling and logging, developers can build robust, efficient, and maintainable Flink applications tailored for scalable streaming architectures.


Course illustration
Course illustration

All Rights Reserved.