I'm taking an Akka Streams Udemy course and this code is provided :
package multiflow;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.ClosedShape;
import akka.stream.FlowShape;
import akka.stream.SinkShape;
import akka.stream.SourceShape;
import akka.stream.javadsl.*;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public class Main {
public static void main(String[] args) {
Map<Integer, VehiclePositionMessage> vehicleTrackingMap = new HashMap<>();
for (int i = 1; i <=8; i++) {
vehicleTrackingMap.put(i, new VehiclePositionMessage(1, new Date(), 0,0));
}
//source - repeat some value every 10 seconds.
Source<String, NotUsed> source = Source.repeat("go").throttle(1, Duration.ofSeconds(10));
//flow 1 - transform into the ids of each van (ie 1..8) with mapConcat
Flow<String, Integer, NotUsed> vehicleIds = Flow.of(String.class)
.mapConcat(value -> List.of(1,2,3,4,5,6,7,8));
//flow 2 - get position for each van as a VPMs with a call to the lookup method (create a new instance of
//utility functions each time). Note that this process isn't instant so should be run in parallel.
Flow<Integer, VehiclePositionMessage, NotUsed> vehiclePostions = Flow.of(Integer.class)
.mapAsyncUnordered(8, vehicleId -> {
System.out.println("Requesting Position for vehicle " + vehicleId);
CompletableFuture<VehiclePositionMessage> future = new CompletableFuture<>();
UtilityFunctions utilityFunctions = new UtilityFunctions();
future.completeAsync( () -> utilityFunctions.getVehiclePosition(vehicleId));
return future;
});
//flow 3 - use previous position from the map to calculate the current speed of each vehicle. Replace the
// position in the map with the newest position and pass the current speed downstream
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = Flow.of(VehiclePositionMessage.class)
.map ( vpm -> {
UtilityFunctions utilityFunctions = new UtilityFunctions();
VehiclePositionMessage previousVpm = vehicleTrackingMap.get(vpm.getVehicleId());
VehicleSpeed speed = utilityFunctions.calculateSpeed(vpm, previousVpm);
System.out.println("Vehicle " + vpm.getVehicleId() + " is travelling at " + speed.getSpeed());
vehicleTrackingMap.put(vpm.getVehicleId(), vpm);
return speed;
});
//flow 4 - filter to only keep those values with a speed > 95
Flow<VehicleSpeed, VehicleSpeed, NotUsed> speedFilter = Flow.of(VehicleSpeed.class)
.filter(speed -> speed.getSpeed() > 95);
//sink - as soon as 1 value is received return it as a materialized value, and terminate the stream
Sink<VehicleSpeed, CompletionStage<VehicleSpeed>> sink = Sink.head();
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
// CompletionStage<VehicleSpeed> result = source.via(vehicleIds)
// .async()
// .via(vehiclePostions)
// .async()
// .via(vehicleSpeeds)
// .via(speedFilter)
// .toMat(sink, Keep.right())
// .run(actorSystem);
RunnableGraph<CompletionStage<VehicleSpeed>> graph = RunnableGraph.fromGraph(
GraphDSL.create( sink, (builder, out) -> {
SourceShape<String> sourceShape = builder.add(Source.repeat("go")
.throttle(1, Duration.ofSeconds(10)));
//SourceShape<String> sourceShape = builder.add(source);
FlowShape<String,Integer> vehicleIdsShape = builder.add(vehicleIds);
FlowShape<Integer, VehiclePositionMessage> vehiclePositionsShape =
builder.add(vehiclePostions.async());
FlowShape<VehiclePositionMessage, VehicleSpeed> vehicleSpeedsShape =
builder.add(vehicleSpeeds);
FlowShape<VehicleSpeed, VehicleSpeed> speedFilterShape =
builder.add(speedFilter);
//DON'T NEED TO DO THIS - OUT IS OUR SINKSHAPE
//SinkShape<VehicleSpeed> out = builder.add(sink);
builder.from(sourceShape)
.via(vehicleIdsShape)
.via(vehiclePositionsShape);
builder.from(vehicleSpeedsShape)
.via(speedFilterShape)
.to(out);
builder.from(vehiclePositionsShape)
.via(vehicleSpeedsShape);
return ClosedShape.getInstance();
})
);
CompletionStage<VehicleSpeed> result = graph.run(actorSystem);
result.whenComplete( (value, throwable) -> {
if (throwable != null) {
System.out.println("Something went wrong " + throwable);
}
else {
System.out.println("Vehicle " + value.getVehicleId() + " was going at a speed of " + value.getSpeed());
}
actorSystem.terminate();
}) ;
}
}
This is not part of homework but for my own understanding I'm attempting to simplify how Flows are chained together.
I create this new class :
import akka.NotUsed;
import akka.stream.javadsl.Flow;
import lombok.Builder;
import java.util.Map;
public class FlowBuilder {
public static Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> builder(Map<Integer, VehiclePositionMessage> vehicleTrackingMap) {
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = Flow.of(VehiclePositionMessage.class)
.map ( vpm -> {
UtilityFunctions utilityFunctions = new UtilityFunctions();
VehiclePositionMessage previousVpm = vehicleTrackingMap.get(vpm.getVehicleId());
VehicleSpeed speed = utilityFunctions.calculateSpeed(vpm, previousVpm);
System.out.println("Vehicle " + vpm.getVehicleId() + " is travelling at " + speed.getSpeed());
vehicleTrackingMap.put(vpm.getVehicleId(), vpm);
return speed;
});
return vehicleSpeeds;
}
}
It is called like so :
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = FlowBuilder.builder(vehicleTrackingMap);
Functionally this works as expected but is there an alternative method/pattern of working with multiple flows and they should be combined. I think using a static is an anti-pattern ?
Update method to call builder :
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = FlowBuilder.builder()
.vehicleTrackingMap(vehicleTrackingMap).build().getFLow();
Flow modified to use builder :
import akka.NotUsed;
import akka.stream.javadsl.Flow;
import lombok.Builder;
import lombok.ToString;
import java.util.Map;
@Builder
@ToString
public class FlowBuilder {
Map<Integer, VehiclePositionMessage> vehicleTrackingMap;
public Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> getFLow() {
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = Flow.of(VehiclePositionMessage.class)
.map ( vpm -> {
UtilityFunctions utilityFunctions = new UtilityFunctions();
VehiclePositionMessage previousVpm = vehicleTrackingMap.get(vpm.getVehicleId());
VehicleSpeed speed = utilityFunctions.calculateSpeed(vpm, previousVpm);
System.out.println("Vehicle " + vpm.getVehicleId() + " is travelling at " + speed.getSpeed());
vehicleTrackingMap.put(vpm.getVehicleId(), vpm);
return speed;
});
return vehicleSpeeds;
}
}
Not sure what you're trying to simplify here, but if you want to avoid having all the flows defined in one method just wrap each in its own method in the same class. If you make those methods public you can then unit test each Flow
separately. So something like that
public class MyStream {
public void runStream() {
var myMap = new HashMap<???,???>()
mySource()
.via(myFlow1())
.via(myFlow2(myMap))
.run(....);
}
public Source<???,???> mySource() { ??? }
public Flow<???,???,???> myFlow1() { ??? }
public Flow<???,???,???> myFlow2(Map<???, ???> stuffYouNeedForFlow) {???}
}
Now every Flow
can be tested using testing techniques described here https://doc.akka.io/docs/akka/current/stream/stream-testkit.html