Akka-http的stream类型数据内容是以Source,Akka-http的stream类型数据内容是以Source

 
 在头里几篇探究里大家都提到过:Akka-http是一项系统融为一体工具库。它是以数据互换的格局开展系统融为一体的。所以,Akka-http的主干职能应该是数据沟通的落实了:应该能因而某种公开的数码格式和传导标准相比有利的兑现包罗异类系统里面通过网上进行的数据沟通。覆盖包含:数据编码、发送和多少接受、解析全经过。Akka-http提供了许多网上传输标准数据的概括模型以及数据类型转换方法,可以使编程人士很便宜的打造网上往来的Request和Response。可是,现实中的数据交流远远不止针对request和response操作可以知足的。系统之间数据调换平常提到文件或者数据库表类型的多寡上传下载。即便在Http标准中讲述了什么样通过MultiPart音讯类型进行批量数额的传输,可是那几个标准提到的达成细节包涵数据内容叙述、数据分段格局、新闻数据长度计算等等几乎可以马上令人却步。Akka-http是根据Akka-stream开发的:不但它的行事流程可以用Akka-stream来发布,它还帮助stream化的数量传输。大家精晓:Akka-stream提供了作用强大的FileIO和Data-Streaming,可以用Stream-Source代表文件或数据库数据源。简单的说:Akka-http的消息数据内容HttpEntity可以支撑理论上无与伦比长度的data-stream。最难能可贵的是:这一个Source是个Reactive-Stream-Source,具备了back-pressure机制,可以有效应付数据互换插手两方Reactive端点分歧的数据传输速率。

 
 在前方几篇琢磨里大家都涉嫌过:Akka-http是一项系统融为一体工具库。它是以数据沟通的款型开展系统融为一体的。所以,Akka-http的骨干职能应该是数据沟通的贯彻了:应该能因此某种公开的数目格式和传导标准相比便宜的完结包罗异类系统里面通过网上展开的数据交流。覆盖包含:数据编码、发送和数量接收、解析全经过。Akka-http提供了很多网上传输标准数量的席卷模型以及数据类型转换方法,可以使编程人士很方便的创设网上往来的Request和Response。不过,现实中的数据沟通远远不止针对request和response操作可以满足的。系统之间数据沟通常常提到文件或者数据库表类型的多少上传下载。即便在Http标准中描述了什么样通过MultiPart新闻类型进行批量数据的传输,可是这么些正式提到的落到实处细节包蕴数据内容叙述、数据分段方式、音讯数据长度计算等等大致可以即时令人却步。Akka-http是按照Akka-stream开发的:不但它的干活流程可以用Akka-stream来表述,它还协助stream化的数额传输。大家知道:Akka-stream提供了作用强大的FileIO和Data-Streaming,可以用Stream-Source代表文件或数据库数据源。一句话来说:Akka-http的音信数据内容HttpEntity可以协理理论上非凡长度的data-stream。最敬重的是:那些Source是个Reactive-Stream-Source,具备了back-pressure机制,可以使得应付数据交流参预两方Reactive端点差距的多寡传输速率。

 
Akka-http的stream类型数据内容是以Source[T,_]品种表示的。首先,Akka-stream通过FileIO对象提供了丰裕多的file-io操作函数,其中有个fromPath函数可以用某个文件内容数据打造一个Source类型:

 
Akka-http的stream类型数据内容是以Source[T,_]品种表示的。首先,Akka-stream通过FileIO对象提供了足足多的file-io操作函数,其中有个fromPath函数能够用某个文件内容数据创设一个Source类型:

/**
   * Creates a Source from a files contents.
   * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
   * except the final element, which will be up to `chunkSize` in size.
   *
   * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
   * set it for a given Source by using [[akka.stream.ActorAttributes]].
   *
   * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
   * and a possible exception if IO operation was not completed successfully.
   *
   * @param f         the file path to read from
   * @param chunkSize the size of each read operation, defaults to 8192
   */
  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
    fromPath(f, chunkSize, startPosition = 0)
/**
   * Creates a Source from a files contents.
   * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
   * except the final element, which will be up to `chunkSize` in size.
   *
   * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
   * set it for a given Source by using [[akka.stream.ActorAttributes]].
   *
   * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
   * and a possible exception if IO operation was not completed successfully.
   *
   * @param f         the file path to read from
   * @param chunkSize the size of each read operation, defaults to 8192
   */
  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
    fromPath(f, chunkSize, startPosition = 0)

本条函数创设了Source[ByteString,Future[IOResult]],大家须要把ByteString转化成MessageEntity。首先须要在implicit-scope内提供马尔斯haller[ByteString,MessageEntity]品类的隐式实例:

其一函数打造了Source[ByteString,Future[IOResult]],大家须求把ByteString转化成MessageEntity。首先须要在implicit-scope内提供马尔斯haller[ByteString,MessageEntity]品类的隐式实例:

trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object ServerStreaming extends App {
  import JsConverters._
...
trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object ServerStreaming extends App {
  import JsConverters._
...

俺们还需求Json-Streaming帮忙:

大家还索要Json-Streaming接济:

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
      .withParallelMarshalling(parallelism = 8, unordered = false)
  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
      .withParallelMarshalling(parallelism = 8, unordered = false)

FileIO是blocking操作,大家还足以选拔独立的线程供blocking操作使用:

FileIO是blocking操作,我们还能拔取独立的线程供blocking操作使用:

   FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
   FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))

现行我们得以从在server上用一个文书创设Source然后再转成Response:

近年来大家能够从在server上用一个文本打造Source然后再转成Response:

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } 
    }
  def loadFile(path: String) = {
 //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val file = Paths.get("/Users/tiger/"+path)
    FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
      .map(_.utf8String)
  }
  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } 
    }
  def loadFile(path: String) = {
 //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val file = Paths.get("/Users/tiger/"+path)
    FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
      .map(_.utf8String)
  }

一律,大家也足以把数据库表内数据转成Akka-Stream-Source,然后再落到实处到MessageEntity的变换。转换进度包蕴用Query读取数据库表内数据后转成Reactive-Publisher,然后把publisher转成Akka-Stream-Source,如下:

无异于,大家也可以把数据库表内数据转成Akka-Stream-Source,然后再得以完结到MessageEntity的更换。转换进程包蕴用Query读取数据库表内数据后转成Reactive-Publisher,然后把publisher转成Akka-Stream-Source,如下:

object SlickDAO {
  import slick.jdbc.H2Profile.api._
  val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
  val db = dbConfig.db

  case class CountyModel(id: Int, name: String)
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

  def loadTable(filter: String) = {
    //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val publisher = db.stream(qry.result)
    Source.fromPublisher(publisher = publisher)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  }
}
object SlickDAO {
  import slick.jdbc.H2Profile.api._
  val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
  val db = dbConfig.db

  case class CountyModel(id: Int, name: String)
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

  def loadTable(filter: String) = {
    //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val publisher = db.stream(qry.result)
    Source.fromPublisher(publisher = publisher)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  }
}

接下来进行到MessageEntity的更换:

下一场举行到MessageEntity的转换:

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } ~
      path("tables"/Segment) { t =>
        complete(SlickDAO.loadTable(t))
      }
    }
  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } ~
      path("tables"/Segment) { t =>
        complete(SlickDAO.loadTable(t))
      }
    }

上面是这一次示范的完全源代码:

上边是这次示范的全部源代码:

import java.nio.file._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.common._
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson


object SlickDAO {
  import slick.jdbc.H2Profile.api._
  val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
  val db = dbConfig.db

  case class CountyModel(id: Int, name: String)
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

  def loadTable(filter: String) = {
    //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val publisher = db.stream(qry.result)
    Source.fromPublisher(publisher = publisher)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  }
}

trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object ServerStreaming extends App {
  import JsConverters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
      .withParallelMarshalling(parallelism = 8, unordered = false)



  val (port, host) = (8011,"localhost")

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } ~
      path("tables"/Segment) { t =>
        complete(SlickDAO.loadTable(t))
      }
    }

  def loadFile(path: String) = {
 //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val file = Paths.get("/Users/tiger/"+path)
    FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
      .map(_.utf8String)
  }

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}
import java.nio.file._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.common._
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson


object SlickDAO {
  import slick.jdbc.H2Profile.api._
  val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
  val db = dbConfig.db

  case class CountyModel(id: Int, name: String)
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

  def loadTable(filter: String) = {
    //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val publisher = db.stream(qry.result)
    Source.fromPublisher(publisher = publisher)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  }
}

trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object ServerStreaming extends App {
  import JsConverters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
      .withParallelMarshalling(parallelism = 8, unordered = false)



  val (port, host) = (8011,"localhost")

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } ~
      path("tables"/Segment) { t =>
        complete(SlickDAO.loadTable(t))
      }
    }

  def loadFile(path: String) = {
 //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val file = Paths.get("/Users/tiger/"+path)
    FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
      .map(_.utf8String)
  }

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

相关文章