I'm trying to use Akka HTTP and Akka Streams to run a scraper. I'm starting with a bunch of index pages, parsing links out of them, then fetching each link and parsing that page, to return a bunch of individual links. So, like this:
fetch-top-level-page -> list-of-links-to-child-pages -> fetch-child-page -> list-of-links-in-child-page
My problem is that I'm not even able to fetch a single page. Each top-level URL I try to fetch results in a dead letter, and nothing ever makes it further down the pipeline.
In this sample code, all I'm trying to do is send HttpRequest
into the pool to be transformed into an HttpResponse
, and prove that it worked by printing stuff to the screen.
implicit val system = ActorSystem("scraper")
implicit val ec = system.dispatcher
implicit val settings = system.settings
implicit val materializer = ActorMaterializer()
val requests = List(HttpRequest(...), HttpRequest(...))
val poolClientFlow = Http().superPool[Promise[HttpResponse]](settings = ConnectionPoolSettings(system).withMaxConnections(10))
Source(requests)
.map (req => { println("-", req); req}) // this part runs fine
.via(poolClientFlow)
.map(resp => {println("|", resp); resp}) // this never runs
.toMat(Sink.foreach { p => println(p) })(Keep.both)
.run()
Here's what I get:
(-,(HttpRequest(...),Future(<not completed>)))
(-,(HttpRequest(...),Future(<not completed>)))
[INFO] [03/07/2018 15:10:03.681] [scraper-akka.actor.default-dispatcher-5] [akka://scraper/user/pool-master] Message [akka.http.impl.engine.client.PoolMasterActor$SendRequest] without sender to Actor[akka://scraper/user/pool-master#1333123700] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [03/07/2018 15:10:03.683] [scraper-akka.actor.default-dispatcher-5] [akka://scraper/user/pool-master] Message [akka.http.impl.engine.client.PoolMasterActor$SendRequest] without sender to Actor[akka://scraper/user/pool-master#1333123700] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
I'm new to Akka and obviously making some basic error, since this seems to be the exact use case that Akka, Akka Streams, and Akka HTTP were built for.
Any ideas?
Unable to reproduce with Akka HTTP 10.1.0-RC2 and Akka Streams 2.5.11. The following works:
val requests = List((HttpRequest(uri = "http://akka.io"), Promise[HttpResponse]()),
(HttpRequest(uri = "http://www.yahoo.com"), Promise[HttpResponse]()))
val poolClientFlow =
Http().superPool[Promise[HttpResponse]](settings = ConnectionPoolSettings(system).withMaxConnections(10)
Source(requests)
.map { req => println("-", req); req }
.via(poolClientFlow)
.map { resp => println("|", resp); resp }
.toMat(Sink.foreach(println))(Keep.both)
.run()
// The following is printed:
// (-,(HttpRequest(HttpMethod(GET),http://akka.io,List(),HttpEntity.Strict(none/none,ByteString()),HttpProtocol(HTTP/1.1)),Future()))
// (-,(HttpRequest(HttpMethod(GET),http://www.yahoo.com,List(),HttpEntity.Strict(none/none,ByteString()),HttpProtocol(HTTP/1.1)),Future()))
// (|,(Success(HttpResponse(301 Moved Permanently,List(Date: Thu, 08 Mar 2018 14:33:46 GMT, Connection: keep-alive, Cache-Control: max-age=3600, Expires: Thu, 08 Mar 2018 15:33:46 GMT, Location: https://akka.io/, Server: cloudflare, CF-RAY: 3f8604d386979cf6-AMS),HttpEntity.Chunked(application/octet-stream),HttpProtocol(HTTP/1.1))),Future()))
// (Success(HttpResponse(301 Moved Permanently,List(Date: Thu, 08 Mar 2018 14:33:46 GMT, Connection: keep-alive, Cache-Control: max-age=3600, Expires: Thu, 08 Mar 2018 15:33:46 GMT, Location: https://akka.io/, Server: cloudflare, CF-RAY: 3f8604d386979cf6-AMS),HttpEntity.Chunked(application/octet-stream),HttpProtocol(HTTP/1.1))),Future())
// (|,(Success(HttpResponse(301 Moved Permanently,List(Date: Thu, 08 Mar 2018 14:33:46 GMT, Connection: keep-alive, Via: http/1.1 media-router-fp6.prod.media.ir2.yahoo.com (ApacheTrafficServer [c s f ]), Server: ATS, Cache-Control: no-store, no-cache, Content-Language: en, X-Frame-Options: SAMEORIGIN, Location: https://www.yahoo.com/),HttpEntity.Strict(text/html,ByteString(114, 101, 100, 105, 114, 101, 99, 116)),HttpProtocol(HTTP/1.1))),Future()))
// (Success(HttpResponse(301 Moved Permanently,List(Date: Thu, 08 Mar 2018 14:33:46 GMT, Connection: keep-alive, Via: http/1.1 media-router-fp6.prod.media.ir2.yahoo.com (ApacheTrafficServer [c s f ]), Server: ATS, Cache-Control: no-store, no-cache, Content-Language: en, X-Frame-Options: SAMEORIGIN, Location: https://www.yahoo.com/),HttpEntity.Strict(text/html,ByteString(114, 101, 100, 105, 114, 101, 99, 116)),HttpProtocol(HTTP/1.1))),Future())
// [WARN] [03/08/2018 14:46:31.003] [scraper-akka.actor.default-dispatcher-4] [scraper/Pool(shared->http://akka.io:80)] [0 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it. GET / Empty -> 301 Moved Permanently Chunked
Probably a better approach is something like this (note the call to discardEntityBytes()
):
Source(requests)
.map { req => println("-", req); req }
.via(poolClientFlow)
.map { resp => println("|", resp); resp }
.toMat(Sink.foreach({
case ((util.Success(resp), p)) =>
resp.discardEntityBytes()
p.success(resp)
case ((util.Failure(e), p)) => p.failure(e)
}))(Keep.both)
.run()
// The following is printed:
// (-,(HttpRequest(HttpMethod(GET),http://akka.io,List(),HttpEntity.Strict(none/none,ByteString()),HttpProtocol(HTTP/1.1)),Future()))
// (-,(HttpRequest(HttpMethod(GET),http://www.yahoo.com,List(),HttpEntity.Strict(none/none,ByteString()),HttpProtocol(HTTP/1.1)),Future()))
// (|,(Success(HttpResponse(301 Moved Permanently,List(Date: Thu, 08 Mar 2018 14:38:32 GMT, Connection: keep-alive, Via: http/1.1 media-router-fp21.prod.media.ir2.yahoo.com (ApacheTrafficServer [c s f ]), Server: ATS, Cache-Control: no-store, no-cache, Content-Language: en, X-Frame-Options: SAMEORIGIN, Location: https://www.yahoo.com/),HttpEntity.Strict(text/html,ByteString(114, 101, 100, 105, 114, 101, 99, 116)),HttpProtocol(HTTP/1.1))),Future()))
// (|,(Success(HttpResponse(301 Moved Permanently,List(Date: Thu, 08 Mar 2018 14:38:32 GMT, Connection: keep-alive, Cache-Control: max-age=3600, Expires: Thu, 08 Mar 2018 15:38:32 GMT, Location: https://akka.io/, Server: cloudflare, CF-RAY: 3f860bca84a32ba6-AMS),HttpEntity.Chunked(application/octet-stream),HttpProtocol(HTTP/1.1))),Future()))