将整个HttpResponse正文作为字符串与Akka-Stream HTTP一起使用

我试图了解如何使用新的akka.http库。我想向服务器发送一个http请求,并将整个响应主体作为一个字符串读取,以产生一个Source[String,?]。将整个HttpResponse正文作为字符串与Akka-Stream HTTP一起使用

这里是最好的解决办法我能到目前为止产生:

def get(

modelID: String,

pool: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool]

): Source[String,Unit] = {

val uri = reactionsURL(modelID)

val req = HttpRequest(uri = uri)

Source.single((req,0))

.via(pool)

.map {

case (Success(resp),_) =>

resp.entity.dataBytes.map(_.decodeString("utf-8"))

}.flatten(FlattenStrategy.concat)

.grouped(1024)

.map(_.mkString)

它似乎运作良好(除缺少的错误路径),但它是这种简单的任务有点笨重。有更聪明的解决方案吗?我可以避开grouped/mkString

回答:

您可以使用toStrict方法HttpResponse超时。它将整个答案汇总为未来。

DEF toStrict(超时:FiniteDuration)(隐式EC:的ExecutionContext,FM:Materializer):未来[严格]返回该消息的一个可共享的和序列化

副本具有严格的实体。

实施例:

import akka.actor.ActorSystem 

import akka.http.scaladsl.Http

import akka.http.scaladsl.model.{HttpResponse, HttpRequest}

import akka.stream.{Materializer, ActorMaterializer}

import akka.stream.scaladsl.{Sink, Flow, Source}

import scala.concurrent.{ExecutionContext, Future}

import scala.concurrent.duration._

import scala.util.{Try, Success}

object Main extends App {

implicit val system = ActorSystem()

import system.dispatcher

implicit val materializer = ActorMaterializer()

val host = "127.0.0.1"

lazy val pool = Http().newHostConnectionPool[Int](host, 9000)

FlowBuilder.get("/path", pool).to(Sink.foreach(_.foreach(println))).run()

}

object FlowBuilder {

def get(modelID: String, pool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool])

(implicit ec: ExecutionContext, mat: Materializer): Source[Future[String], Unit] = {

val uri = modelID

val req = HttpRequest(uri = modelID)

Source.single((req, 0)).via(pool)

.map {

case (Success(resp), _) => resp.entity.toStrict(5 seconds).map(_.data.decodeString("UTF-8"))

}

}

}

回答:

可以使用Unmarshall这也将在其他类型的工作例如json来自spray-json。这也作为strict返回Future[_]

例子:

authedReq.via(authServerReqResFlow).mapAsync(1) { case (tryRes, _) => 

tryRes match {

case Failure(exception) => Future.failed[Principal](exception)

case Success(response @ HttpResponse(StatusCodes.OK,_,_,_)) =>

val userContext = Unmarshal(response).to[UserContextData]

userContext.map {

case UserContextData(UserInfo(_, userName, fullName, email, title), _, _) =>

Principal(userName, fullName, email, title)

}

case Success(response @ HttpResponse(responseCode,_,entity,_)) =>

Unmarshal(entity).to[String].flatMap(msg => Future.failed(new AuthenticationFailure(s"$responseCode\n$msg")))

}

}

以上是 将整个HttpResponse正文作为字符串与Akka-Stream HTTP一起使用 的全部内容, 来源链接: utcz.com/qa/257159.html

回到顶部