在前方一篇研究里我们介绍了经过http进行文件的沟通,ByteString或者反方向的变换皇冠现金app

在客户端试运行重返结果显示:

 

  def postExceptionHandler: ExceptionHandler =
    ExceptionHandler {
      case _: RuntimeException =>
        extractRequest { req =>
          req.discardEntityBytes()
          complete((StatusCodes.InternalServerError.intValue,"Upload Failed!"))
        }
    }

      post {
        withoutSizeLimit {
          handleExceptions(postExceptionHandler) {
            entity(asSourceOf[County]) { source =>
              val futofNames: Future[List[String]] =
                source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
              complete {
                futofNames
              }
            }
          }
        }
      }

考虑到在数额转换的进度中或许会出现很是。须要卓殊处理办法来刑满释放backpressure:

 

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.unmarshalling.Unmarshal

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  implicit val countyFormat = jsonFormat2(County)
}

object HttpDBClient extends App {
  import Converters._

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()

  def downloadRows(request: HttpRequest) = {
    val futResp = Http(sys).singleRequest(request)
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          val futSource = Unmarshal(entity).to[Source[County,NotUsed]]
          futSource.onSuccess {
            case source => source.runForeach(println)
          }
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download rows!")
        case Failure(err) => println(s"download failed: ${err.getMessage}")

      }
  }
  downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows"))

  scala.io.StdIn.readLine()

  sys.terminate()

}

上边就让大家伊始写些代码吧。首先,大家用一个case
class代表数据库表行结构,然后用它看作流元平素营造一个Source,如下:

     post {
        withoutSizeLimit {
          entity(asSourceOf[County]) { source =>
            val futofNames: Future[List[String]] =
              source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
            complete {
              futofNames
            }
          }
        }
      }

 

 

一经服务端收到多少后以Akka-stream情势再转换成一个List重回,大家用上面的艺术来测试作用:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport


trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }
  implicit val countyFormat = jsonFormat2(County)
}

object HttpDBServer extends App {
  import Converters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher


  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = false)

  val route =
    path("rows") {
      get {
        complete {
          source
        }
      }
    }

  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}
  // support for as[Source[T, NotUsed]]
  implicit def sprayJsonSourceReader[T](implicit reader: RootJsonReader[T], support: EntityStreamingSupport): FromEntityUnmarshaller[Source[T, NotUsed]] =
    Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ e ⇒
      if (support.supported.matches(e.contentType)) {
        val frames = e.dataBytes.via(support.framingDecoder)
        val unmarshal = sprayJsonByteStringUnmarshaller(reader)(_)
        val unmarshallingFlow =
          if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(unmarshal)
          else Flow[ByteString].mapAsync(support.parallelism)(unmarshal)
        val elements = frames.viaMat(unmarshallingFlow)(Keep.right)
        FastFuture.successful(elements)
      } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))
    }

下边就让我们早先写些代码吧。首先,我们用一个case
class代表数据库表行结构,然后用它看成流元一向打造一个Source,如下:

 

 

 

 

     case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          val futSource = Unmarshal(entity).to[Source[County,NotUsed]]
          futSource.onSuccess {
            case source => source.runForeach(println)
          }

 

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.unmarshalling._

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  implicit val countyFormat = jsonFormat2(County)
}

object HttpDBClient extends App {
  import Converters._

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()

  def downloadRows(request: HttpRequest) = {
    val futResp = Http(sys).singleRequest(request)
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          val futSource = Unmarshal(entity).to[Source[County,NotUsed]]
          futSource.onSuccess {
            case source => source.runForeach(println)
          }
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download rows!")
        case Failure(err) => println(s"download failed: ${err.getMessage}")

      }
  }
  downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows"))


  import akka.util.ByteString
  import akka.http.scaladsl.model.HttpEntity.limitableByteSource

  val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}
  def countyToByteString(c: County) = {
    ByteString(c.toJson.toString)
  }
  val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)

  val rowBytes = limitableByteSource(source via flowCountyToByteString)

  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
  val data = HttpEntity(
    ContentTypes.`application/json`,
    rowBytes
  )

  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {
    val futResp = Http(sys).singleRequest(
      request.copy(entity = dataEntity)
    )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")

      }
  }

  uploadRows(request,data)

  scala.io.StdIn.readLine()

  sys.terminate()

}

 

在上头的代码里我们平昔把source放进了complete(),然后希望以此directive能透过ToEntity马尔斯haller[County]类实例用Spray-Json把Source[County,NotUsed]转换成Source[ByteString,NotUsed]接下来放入HttpResponse的HttpEntity里。转换结果不得不在客户端得到验证。大家清楚HttpResponse里的Entity.dataBytes就是一个Source[ByteString,_],大家可以把它Unmarshall成Source[County,_],然后用Akka-stream来操作:

 

 

 

下边是此次探究的演示代码:

  def postExceptionHandler: ExceptionHandler =
    ExceptionHandler {
      case _: RuntimeException =>
        extractRequest { req =>
          req.discardEntityBytes()
          complete((StatusCodes.InternalServerError.intValue,"Upload Failed!"))
        }
    }

      post {
        withoutSizeLimit {
          handleExceptions(postExceptionHandler) {
            entity(asSourceOf[County]) { source =>
              val futofNames: Future[List[String]] =
                source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
              complete {
                futofNames
              }
            }
          }
        }
      }

服务端接收数据处理办法如下:

 

 

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import scala.concurrent._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }
  implicit val countyFormat = jsonFormat2(County)
}

object HttpDBServer extends App {
  import Converters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher


  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = false)

  def postExceptionHandler: ExceptionHandler =
    ExceptionHandler {
      case _: RuntimeException =>
        extractRequest { req =>
          req.discardEntityBytes()
          complete((StatusCodes.InternalServerError.intValue,"Upload Failed!"))
        }
    }

  val route =
    path("rows") {
      get {
        complete {
          source
        }
      } ~
      post {
        withoutSizeLimit {
          handleExceptions(postExceptionHandler) {
            entity(asSourceOf[County]) { source =>
              val futofNames: Future[List[String]] =
                source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
              complete {
                futofNames
              }
            }
          }
        }
      }
    }

  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

咱俩先规划服务端的数码下载部分:

服务端接收数据处理办法如下:

 

地方那一个Unmarshal调用了下边这一个FromEntityUnmarshaller[County]隐式实例:

  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {
    val futResp = Http(sys).singleRequest(
      request.copy(entity = dataEntity)
    )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")

      }
  }
  uploadRows(request,data)

["","广西壮族自治区地市县编号 #1","广西壮族自治区地市县编号 #2","广西壮族自治区地市县编号 #3","广西壮族自治区地市县编号 #4","广西壮族自治区地市县编号 #5"]

 

客户端:

 

 

设想到在数据转换的进程中或许会出现非常。需求越发处理措施来刑满释放backpressure:

幸而大家期望的结果。

  import akka.util.ByteString
  import akka.http.scaladsl.model.HttpEntity.limitableByteSource

  val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}
  def countyToByteString(c: County) = {
    ByteString(c.toJson.toString)
  }
  val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)

  val rowBytes = limitableByteSource(source via flowCountyToByteString)

  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
  val data = HttpEntity(
    ContentTypes.`application/json`,
    rowBytes
  )

我们直接用toJson函数进行County->Json转换实现了flowCountyToByteString。toJason是Spray-Json提供的一个函数:
package json {

  case class DeserializationException(msg: String, cause: Throwable = null, fieldNames: List[String] = Nil) extends RuntimeException(msg, cause)
  class SerializationException(msg: String) extends RuntimeException(msg)

  private[json] class PimpedAny[T](any: T) {
    def toJson(implicit writer: JsonWriter[T]): JsValue = writer.write(any)
  }

  private[json] class PimpedString(string: String) {
    @deprecated("deprecated in favor of parseJson", "1.2.6")
    def asJson: JsValue = parseJson
    def parseJson: JsValue = JsonParser(string)
  }
}
  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {
    val futResp = Http(sys).singleRequest(
      request.copy(entity = dataEntity)
    )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")

      }
  }

服务端:

  import akka.util.ByteString
  import akka.http.scaladsl.model.HttpEntity.limitableByteSource

  val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}
  def countyToByteString(c: County) = {
    ByteString(c.toJson.toString)
  }
  val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)

  val rowBytes = limitableByteSource(source via flowCountyToByteString)

  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
  val data = HttpEntity(
    ContentTypes.`application/json`,
    rowBytes
  )

我们直接用toJson函数进行County->Json转换实现了flowCountyToByteString。toJason是Spray-Json提供的一个函数:
package json {

  case class DeserializationException(msg: String, cause: Throwable = null, fieldNames: List[String] = Nil) extends RuntimeException(msg, cause)
  class SerializationException(msg: String) extends RuntimeException(msg)

  private[json] class PimpedAny[T](any: T) {
    def toJson(implicit writer: JsonWriter[T]): JsValue = writer.write(any)
  }

  private[json] class PimpedString(string: String) {
    @deprecated("deprecated in favor of parseJson", "1.2.6")
    def asJson: JsValue = parseJson
    def parseJson: JsValue = JsonParser(string)
  }
}

 

在客户端试运行再次来到结果展现:

上述我们早就落实了客户端从服务端下载一段数据库表行,然后以Akka-stream的操作办法来处理下载数据。那么反向交流即从客户端上传一段表行的话就需求把一个Source[T,_]转换成Source[ByteString,_]接下来放进HttpRequest的HttpEntity里。服务端收到数额后又要拓展反向的更换即把Request.Entity.dataBytes从Source[ByteString,_]转回Source[T,_]。Akka-http在客户端从未提供像complete那样的无敌的自动化成效。大家兴许要求自定义并提供像ToRequest马尔斯haller[Source[T,_]]诸如此类的隐式实例。但Akka-http的Marshalling-type-class是个分外复杂的系列。即使大家的目标是简单提供一个Source[ByteString,_],大家是或不是可以向来调用Spray-Json的函数来拓展ROW->Son->ByteString转换呢?如下:

 
在头里一篇研商里我们介绍了经过http进行文件的置换。因为文件内容是以一堆bytes来表示的,而http音信的数目部分也是byte类型的,所以大家可以直接用Source[ByteString,_]来读取文件然后放进HttpEntity中。大家还涉及:即便必要开展数据库数据沟通的话,可以用Source[ROW,_]来代表库表行,但首先必须举行ROW
->
ByteString的变换。在上期谈论大家关系过那种转移其实是ROW->Json->ByteString或者反方向的转移,在Akka-http里称之为马尔斯halling和Unmarshalling。Akka-http的马尔斯halling完毕应用了type-class编程格局,要求为每一系列型与Json的转移在可视域内提供马尔斯haller[A,B]类型的隐式实例。Akka-http默认的Json工具库是Spray-Json,珍贵case
class,而且要提供JsonFormat?(case-class),其中?代表case
class的参数个数,用起来略显复杂。不过因为是Akka-http的配套库,在今天Akka-http的接踵而至 蜂拥而至前进中装有自然的优势,所以大家依旧用它来进展上面的言传身教。

幸亏大家盼望的结果。

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.unmarshalling._

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  implicit val countyFormat = jsonFormat2(County)
}

object HttpDBClient extends App {
  import Converters._

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()

  def downloadRows(request: HttpRequest) = {
    val futResp = Http(sys).singleRequest(request)
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          val futSource = Unmarshal(entity).to[Source[County,NotUsed]]
          futSource.onSuccess {
            case source => source.runForeach(println)
          }
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download rows!")
        case Failure(err) => println(s"download failed: ${err.getMessage}")

      }
  }
  downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows"))


  import akka.util.ByteString
  import akka.http.scaladsl.model.HttpEntity.limitableByteSource

  val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}
  def countyToByteString(c: County) = {
    ByteString(c.toJson.toString)
  }
  val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)

  val rowBytes = limitableByteSource(source via flowCountyToByteString)

  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
  val data = HttpEntity(
    ContentTypes.`application/json`,
    rowBytes
  )

  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {
    val futResp = Http(sys).singleRequest(
      request.copy(entity = dataEntity)
    )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")

      }
  }

  uploadRows(request,data)

  scala.io.StdIn.readLine()

  sys.terminate()

}

 

  // support for as[Source[T, NotUsed]]
  implicit def sprayJsonSourceReader[T](implicit reader: RootJsonReader[T], support: EntityStreamingSupport): FromEntityUnmarshaller[Source[T, NotUsed]] =
    Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ e ⇒
      if (support.supported.matches(e.contentType)) {
        val frames = e.dataBytes.via(support.framingDecoder)
        val unmarshal = sprayJsonByteStringUnmarshaller(reader)(_)
        val unmarshallingFlow =
          if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(unmarshal)
          else Flow[ByteString].mapAsync(support.parallelism)(unmarshal)
        val elements = frames.viaMat(unmarshallingFlow)(Keep.right)
        FastFuture.successful(elements)
      } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))
    }
     post {
        withoutSizeLimit {
          entity(asSourceOf[County]) { source =>
            val futofNames: Future[List[String]] =
              source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
            complete {
              futofNames
            }
          }
        }
      }

服务端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport


trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }
  implicit val countyFormat = jsonFormat2(County)
}

object HttpDBServer extends App {
  import Converters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher


  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = false)

  val route =
    path("rows") {
      get {
        complete {
          source
        }
      }
    }

  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

 

 
在面前一篇琢磨里大家介绍了经过http进行文件的互换。因为文件内容是以一堆bytes来代表的,而http信息的数额部分也是byte类型的,所以我们得以一贯用Source[ByteString,_]来读取文件然后放进HttpEntity中。我们还关乎:即使须要举办数据库数据交流的话,可以用Source[ROW,_]来代表库表行,但首先必须举办ROW
->
ByteString的变换。在上期议论大家提到过那种转移其实是ROW->Json->ByteString或者反方向的变换,在Akka-http里称之为马尔斯halling和Unmarshalling。Akka-http的马尔斯halling完毕利用了type-class编程情势,须要为每一种档次与Json的转移在可视域内提供马尔斯haller[A,B]体系的隐式实例。Akka-http默许的Json工具库是Spray-Json,爱慕case
class,而且要提供JsonFormat?(case-class),其中?代表case
class的参数个数,用起来略显复杂。可是因为是Akka-http的配套库,在以后Akka-http的随处前进中具有自然的优势,所以大家仍然用它来进展上边的演示。

一经服务端收到数额后以Akka-stream情势再转换成一个List再次来到,大家用上边的法子来测试作用:

上边那些Unmarshal调用了上边那个FromEntityUnmarshaller[County]隐式实例:

 

客户端:

这几个隐式实例是由Spray-杰森提供的,在SprayJsonSupport.scala里。
上面是这一部分客户端的完整代码: 

  case class County(id: Int, name: String)
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }

在上头的代码里大家一贯把source放进了complete(),然后希望以此directive能经过ToEntity马尔斯haller[County]类实例用Spray-Json把Source[County,NotUsed]转换成Source[ByteString,NotUsed]下一场放入HttpResponse的HttpEntity里。转换结果只可以在客户端得到讲明。大家领略HttpResponse里的Entity.dataBytes就是一个Source[ByteString,_],大家可以把它Unmarshall成Source[County,_],然后用Akka-stream来操作:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import scala.concurrent._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }
  implicit val countyFormat = jsonFormat2(County)
}

object HttpDBServer extends App {
  import Converters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher


  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = false)

  def postExceptionHandler: ExceptionHandler =
    ExceptionHandler {
      case _: RuntimeException =>
        extractRequest { req =>
          req.discardEntityBytes()
          complete((StatusCodes.InternalServerError.intValue,"Upload Failed!"))
        }
    }

  val route =
    path("rows") {
      get {
        complete {
          source
        }
      } ~
      post {
        withoutSizeLimit {
          handleExceptions(postExceptionHandler) {
            entity(asSourceOf[County]) { source =>
              val futofNames: Future[List[String]] =
                source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
              complete {
                futofNames
              }
            }
          }
        }
      }
    }

  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

 

 

俺们先规划服务端的多寡下载部分:

其一隐式实例是由Spray-杰森提供的,在SprayJsonSupport.scala里。
上面是那部分客户端的总体代码: 

上边是这次研商的言传身教代码:

 

 

 

 

  uploadRows(request,data)

["","广西壮族自治区地市县编号 #1","广西壮族自治区地市县编号 #2","广西壮族自治区地市县编号 #3","广西壮族自治区地市县编号 #4","广西壮族自治区地市县编号 #5"]

上述大家早已达成了客户端从服务端下载一段数据库表行,然后以Akka-stream的操作形式来处理下载数据。那么反向互换即从客户端上传一段表行的话就要求把一个Source[T,_]转换成Source[ByteString,_]接下来放进HttpRequest的HttpEntity里。服务端收到多少后又要拓展反向的更换即把Request.Entity.dataBytes从Source[ByteString,_]转回Source[T,_]。Akka-http在客户端从未提供像complete这样的强硬的自动化功用。大家恐怕需求自定义并提供像ToRequest马尔斯haller[Source[T,_]]如此那般的隐式实例。但Akka-http的马尔斯halling-type-class是个非凡复杂的系统。即使我们的目标是几乎提供一个Source[ByteString,_],我们是还是不是可以间接调用Spray-Json的函数来展开ROW->Son->ByteString转换呢?如下:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.unmarshalling.Unmarshal

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  implicit val countyFormat = jsonFormat2(County)
}

object HttpDBClient extends App {
  import Converters._

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()

  def downloadRows(request: HttpRequest) = {
    val futResp = Http(sys).singleRequest(request)
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          val futSource = Unmarshal(entity).to[Source[County,NotUsed]]
          futSource.onSuccess {
            case source => source.runForeach(println)
          }
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download rows!")
        case Failure(err) => println(s"download failed: ${err.getMessage}")

      }
  }
  downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows"))

  scala.io.StdIn.readLine()

  sys.terminate()

}

 

 

 

 

     case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          val futSource = Unmarshal(entity).to[Source[County,NotUsed]]
          futSource.onSuccess {
            case source => source.runForeach(println)
          }

 

  case class County(id: Int, name: String)
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }