What is the advantage using Akka Actor over normal File operation method?. I tried to calculate the time taken to analyze a log file. The operation is to find the IP addresses which have logged on more than 50 times and display them. Normal file operation was faster when compared to Akka Actor model. Why so?
Using normal file operation
public static void main(String[] args) {
// TODO Auto-generated method stub
//long startTime = System.currentTimeMillis();
File file = new File("log.txt");
Map<String, Long> ipMap = new HashMap<>();
try {
FileReader fr = new FileReader(file);
BufferedReader br = new BufferedReader(fr);
String line = br.readLine();
while(line!=null) {
int idx = line.indexOf('-');
String ipAddress = line.substring(0, idx).trim();
long count = ipMap.getOrDefault(ipAddress, 0L);
ipMap.put(ipAddress, ++count);
line = br.readLine();
}
System.out.println("================================");
System.out.println("||\tCount\t||\t\tIP");
System.out.println("================================");
fr.close();
br.close();
Map<String, Long> result = new HashMap<>();
// Sort by value and put it into the "result" map
ipMap.entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.forEachOrdered(x -> result.put(x.getKey(), x.getValue()));
// Print only if count > 50
result.entrySet().stream().filter(entry -> entry.getValue() > 50).forEach(entry ->
System.out.println("||\t" + entry.getValue() + " \t||\t" + entry.getKey())
);
// long endTime = System.currentTimeMillis();
// System.out.println("Time: "+(endTime-startTime));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
Using Actors:
1. The Main Class
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
// Create actorSystem
ActorSystem akkaSystem = ActorSystem.create("akkaSystem");
// Create first actor based on the specified class
ActorRef coordinator = akkaSystem.actorOf(Props.create(FileAnalysisActor.class));
// Create a message including the file path
FileAnalysisMessage msg = new FileAnalysisMessage("log.txt");
// Send a message to start processing the file. This is a synchronous call using 'ask' with a timeout.
Timeout timeout = new Timeout(6, TimeUnit.SECONDS);
Future<Object> future = Patterns.ask(coordinator, msg, timeout);
// Process the results
final ExecutionContext ec = akkaSystem.dispatcher();
future.onSuccess(new OnSuccess<Object>() {
@Override
public void onSuccess(Object message) throws Throwable {
if (message instanceof FileProcessedMessage) {
printResults((FileProcessedMessage) message);
// Stop the actor system
akkaSystem.shutdown();
}
}
private void printResults(FileProcessedMessage message) {
System.out.println("================================");
System.out.println("||\tCount\t||\t\tIP");
System.out.println("================================");
Map<String, Long> result = new LinkedHashMap<>();
// Sort by value and put it into the "result" map
message.getData().entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.forEachOrdered(x -> result.put(x.getKey(), x.getValue()));
// Print only if count > 50
result.entrySet().stream().filter(entry -> entry.getValue() > 50).forEach(entry ->
System.out.println("||\t" + entry.getValue() + " \t||\t" + entry.getKey())
);
long endTime = System.currentTimeMillis();
System.out.println("Total time: "+(endTime - startTime));
}
}, ec);
}
2.File Analyser Class
public class FileAnalysisActor extends UntypedActor {
private Map<String, Long> ipMap = new HashMap<>();
private long fileLineCount;
private long processedCount;
private ActorRef analyticsSender = null;
@Override
public void onReceive(Object message) throws Exception {
/*
This actor can receive two different messages, FileAnalysisMessage or LineProcessingResult, any
other type will be discarded using the unhandled method
*/
//System.out.println(Thread.currentThread().getName());
if (message instanceof FileAnalysisMessage) {
List<String> lines = FileUtils.readLines(new File(
((FileAnalysisMessage) message).getFileName()));
fileLineCount = lines.size();
processedCount = 0;
// stores a reference to the original sender to send back the results later on
analyticsSender = this.getSender();
for (String line : lines) {
// creates a new actor per each line of the log file
Props props = Props.create(LogLineProcessor.class);
ActorRef lineProcessorActor = this.getContext().actorOf(props);
// sends a message to the new actor with the line payload
lineProcessorActor.tell(new LogLineMessage(line), this.getSelf());
}
} else if (message instanceof LineProcessingResult) {
// a result message is received after a LogLineProcessor actor has finished processing a line
String ip = ((LineProcessingResult) message).getIpAddress();
// increment ip counter
Long count = ipMap.getOrDefault(ip, 0L);
ipMap.put(ip, ++count);
// if the file has been processed entirely, send a termination message to the main actor
processedCount++;
if (fileLineCount == processedCount) {
// send done message
analyticsSender.tell(new FileProcessedMessage(ipMap), ActorRef.noSender());
}
} else {
// Ignore message
this.unhandled(message);
}
}
}
3.Logline Processor Class
public class LogLineProcessor extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof LogLineMessage) {
// What data each actor process?
//System.out.println("Line: " + ((LogLineMessage) message).getData());
// Uncomment this line to see the thread number and the actor name relationship
//System.out.println("Thread ["+Thread.currentThread().getId()+"] handling ["+ getSelf().toString()+"]");
// get the message payload, this will be just one line from the log file
String messageData = ((LogLineMessage) message).getData();
int idx = messageData.indexOf('-');
if (idx != -1) {
// get the ip address
String ipAddress = messageData.substring(0, idx).trim();
// tell the sender that we got a result using a new type of message
this.getSender().tell(new LineProcessingResult(ipAddress), this.getSelf());
}
} else {
// ignore any other message type
this.unhandled(message);
}
}
}
Message Classes
FileAnalysis Message
public class FileAnalysisMessage {
private String fileName;
public FileAnalysisMessage(String file) {
this.fileName = file;
}
public String getFileName() {
return fileName;
}
}
2.File Processed Message
public class FileProcessedMessage {
private Map<String, Long> data;
public FileProcessedMessage(Map<String, Long> data) {
this.data = data;
}
public Map<String, Long> getData() {
return data;
}
}
LineProcessing Result
public class LineProcessingResult {
private String ipAddress;
public LineProcessingResult(String ipAddress) {
this.ipAddress = ipAddress;
}
public String getIpAddress() {
return ipAddress;
}
}
4.Logline Message
public class LogLineMessage {
private String data;
public LogLineMessage(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
I am creating an actor for each line in the file.
With all concurrency frameworks there is always a trade-off between the amount of concurrency that is deployed vs. the complexity involved for each unit of concurrency. Akka is no exception.
In your non-akka approach you have a relatively simple sequence of steps for each line:
By comparison, your akka approach is much more complicated for each line:
LogLineMessage
messageLineProcessingResult
messageIf we naively assumed each of the above steps took the same amount of time then you would need 2 threads with akka just to run at the same speed as 1 thread without akka.
Make Each Concurrency Unit Do More Work
Instead of having 1 Actor
per 1 line, have each actor process N lines into its own sub-hashmap (e.g. each Actor processes 1000 lines):
public class LogLineMessage {
private String[] data;
public LogLineMessage(String[] data) {
this.data = data;
}
public String[] getData() {
return data;
}
}
Then the Actor wouldn't be sending back something as simple as the IP address. Instead it will send a hash of counts for its subset of lines:
public class LineProcessingResult {
private HashMap<String, Long> ipAddressCount;
public LineProcessingResult(HashMap<String, Long> count) {
this.ipAddressCount = Count;
}
public HashMap<String, Long> getIpAddress() {
return ipAddressCount;
}
}
And the coordinating Actor can be responsible for combining all of the various sub-counts:
//inside of FileAnalysisActor
else if (message instanceof LineProcessingResult) {
HashMap<String,Long> localCount = ((LineProcessingResult) message).getIpAddressCount();
localCount.foreach((ipAddress, count) -> {
ipMap.put(ipAddress, ipMap.getOrDefault(ipAddress, 0L) + count);
})
You can then vary N to see where you get peak performance for your particular system.
Don't Read the Whole File Into Memory
One other disadvantage that your concurrent solution has is that it is first reading the entire file into memory. This is unnecessary and taxing for the JVM.
Instead, read the file N lines at a time. Once you have those lines in memory spawn off the Actor as mentioned earlier.
FileReader fr = new FileReader(file);
BufferedReader br = new BufferedReader(fr);
String[] lineBuffer;
int bufferCount = 0;
int N = 1000;
String line = br.readLine();
while(line!=null) {
if(0 == bufferCount)
lineBuffer = new String[N];
else if(N == bufferCount) {
Props props = Props.create(LogLineProcessor.class);
ActorRef lineProcessorActor = this.getContext().actorOf(props);
lineProcessorActor.tell(new LogLineMessage(lineBuffer),
this.getSelf());
bufferCount = 0;
continue;
}
lineBuffer[bufferCount] = line;
br.readLine();
bufferCount++;
}
//handle the final buffer
if(bufferCount > 0) {
Props props = Props.create(LogLineProcessor.class);
ActorRef lineProcessorActor = this.getContext().actorOf(props);
lineProcessorActor.tell(new LogLineMessage(lineBuffer),
this.getSelf());
}
This will allow for File IO, line processing, and sub-map combining to all run in parallel.