Akka-皇冠现金apphttp的数据沟通格局匡助流式操作,文件调换流程包蕴读取文件里的bytes

  所谓文件调换指的是Http协议中服务端和客户端之间文件的上传和下载。Akka-http作为一种系统融为一体工具应该拥有高作用的数据交流形式包蕴文件交流和数据库表行的上传下载。Akka-http的数据互换形式援助流式操作:代表沟通数据可以是一种无限长度流的元素。那种方式首先解决了纯Http大数量通过Multipart传输所不可不开展的多寡分段操作和复杂性的新闻属性设定等急需的技艺门槛,再者用户仍是可以很便宜的使用Akka-stream对数码开展深度处理,免去了多少转换的难为。更器重的是:Akka-http还接济reactive-stream,可防止止由传输速率所发出的各个难点。在本篇大家探讨利用Akka-http举行文件的双向传送。

  所谓文件调换指的是Http协议中服务端和客户端之间文件的上传和下载。Akka-http作为一种系统融为一体工具应该有所高功能的数据沟通格局包罗文件沟通和数据库表行的上传下载。Akka-http的数据沟通形式协助流式操作:代表交流数据足以是一种无限长度流的元素。那种形式首先解决了纯Http大数据经过Multipart传输所必须举办的多少分段操作和复杂性的信息属性设定等须要的技巧门槛,再者用户还足以很有益于的使用Akka-stream对数码举行深度处理,免去了数码转换的困苦。更首要的是:Akka-http还协理reactive-stream,可以幸免由传输速率所发生的各个难点。在本篇我们谈谈利用Akka-http举行文件的双向传送。

 任何公文的情节储存格式无论在硬盘、内存依然数据线上都是一堆bytes。文件调换流程包含读取文件里的bytes,传送那一个bytes,最后把那些bytes写入文件。大家看看此间每个环节操作目的都是bytes,所以可能在先后里是不须求任何数据转换进度的。Akka提供了一组文件读写函数,如下:

 任何文件的情节储存格式无论在硬盘、内存依旧数据线上都是一堆bytes。文件沟通流程包含读取文件里的bytes,传送那么些bytes,最后把这么些bytes写入文件。我们看出那里每个环节操作目的都是bytes,所以可能在先后里是不需求其它数据转换进程的。Akka提供了一组文件读写函数,如下:

  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
    fromPath(f, chunkSize, startPosition = 0)

  def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] =
    Source.fromGraph(new FileSource(f, chunkSize, startPosition, DefaultAttributes.fileSource, sourceShape("FileSource")))

  def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] =
    toPath(f, options, startPosition = 0)

  def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]] =
    Sink.fromGraph(new FileSink(f, startPosition, options, DefaultAttributes.fileSink, sinkShape("FileSink")))
  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
    fromPath(f, chunkSize, startPosition = 0)

  def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] =
    Source.fromGraph(new FileSource(f, chunkSize, startPosition, DefaultAttributes.fileSource, sourceShape("FileSource")))

  def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] =
    toPath(f, options, startPosition = 0)

  def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]] =
    Sink.fromGraph(new FileSink(f, startPosition, options, DefaultAttributes.fileSink, sinkShape("FileSink")))

俺们看到:fromPath类型是Source[ByteSgtring,_],toPath类型是Sink[ByteString,_],间接就是流型式,应该可以一直放入Http音讯的Entity中,如下: 

我们看出:fromPath类型是Source[ByteSgtring,_],toPath类型是Sink[ByteString,_],直接就是流型式,应该可以一向放入Http信息的Entity中,如下: 

  def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = {
    def loadFile = {
      //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
      val file = Paths.get(filePath)
      FileIO.fromPath(file, chunkSize)
        .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
    }
    limitableByteSource(loadFile)
  }
  def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = {
    def loadFile = {
      //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
      val file = Paths.get(filePath)
      FileIO.fromPath(file, chunkSize)
        .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
    }
    limitableByteSource(loadFile)
  }

fileStream是Source[ByteString,_]可以一贯放进Entity:

fileStream是Source[ByteString,_]可以一直放进Entity:

  val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/text")
  val textData = HttpEntity(
    ContentTypes.`application/octet-stream`,
    fileStream("/Users/tiger-macpro/downloads/A4.TIF",256)
  )
  val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/text")
  val textData = HttpEntity(
    ContentTypes.`application/octet-stream`,
    fileStream("/Users/tiger-macpro/downloads/A4.TIF",256)
  )

俺们把fileStream放入了HttpRequest中。对于HttpResponse可以用上面的艺术:

我们把fileStream放入了HttpRequest中。对于HttpResponse可以用上边的方法:

 val route = pathPrefix("file") {
    (get & path("text" / Remaining)) { fp =>
      withoutSizeLimit {
        complete(
          HttpEntity(
            ContentTypes.`application/octet-stream`,
            fileStream("/users/tiger-macpro/" + fp, 256))
        )
      }
 val route = pathPrefix("file") {
    (get & path("text" / Remaining)) { fp =>
      withoutSizeLimit {
        complete(
          HttpEntity(
            ContentTypes.`application/octet-stream`,
            fileStream("/users/tiger-macpro/" + fp, 256))
        )
      }

注意:complete进行了HttpResponse的构建。因为Entity.dataByes就是Source[ByteString,_],所以大家能够直接把它导入Sink:

注意:complete进行了HttpResponse的构建。因为Entity.dataByes就是Source[ByteString,_],所以大家得以一贯把它导入Sink:

          entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
            .onComplete { case _ => println(s"Download file saved to: $destPath") }
          entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
            .onComplete { case _ => println(s"Download file saved to: $destPath") }

上面我们提过FileIO.toPath就是一个Sink。由于大家的目标是巨型的文件调换,所以无论上传下载都利用了withoutSizeLimit:

地方大家提过FileIO.toPath就是一个Sink。由于大家的目标是巨型的文书调换,所以不管上传下载都利用了withoutSizeLimit:

 val route = pathPrefix("file") {
    (get & path("exchange" / Remaining)) { fp =>
      withoutSizeLimit {
        complete(
          HttpEntity(
            ContentTypes.`application/octet-stream`,
            fileStream("/users/tiger-macpro/" + fp, 256))
        )
      }
    } ~
      (post & path("exchange")) {
        withoutSizeLimit {
          extractDataBytes { bytes =>
            val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath)))
            onComplete(fut) { _ =>
              complete(s"Save upload file to: $destPath")
            }
          }
        }

      }
 val route = pathPrefix("file") {
    (get & path("exchange" / Remaining)) { fp =>
      withoutSizeLimit {
        complete(
          HttpEntity(
            ContentTypes.`application/octet-stream`,
            fileStream("/users/tiger-macpro/" + fp, 256))
        )
      }
    } ~
      (post & path("exchange")) {
        withoutSizeLimit {
          extractDataBytes { bytes =>
            val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath)))
            onComplete(fut) { _ =>
              complete(s"Save upload file to: $destPath")
            }
          }
        }

      }

好了上面的以身作则代码里对字符型或二进制文件都进行了置换的言传身教操作:

好了下边的以身作则代码里对字符型或二进制文件都进展了置换的言传身教操作:

服务端:

服务端:

 

 

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpEntity._
import java.nio.file._

object FileServer extends App {

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher

  def fileStream(filePath: String, chunkSize: Int) = {
     def loadFile = {
       //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
       val file = Paths.get(filePath)
       FileIO.fromPath(file, chunkSize)
         .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
     }
    limitableByteSource(loadFile)
  }
  val destPath = "/users/tiger-macpro/downloads/A4-1.TIF"
  val route = pathPrefix("file") {
    (get & path("exchange" / Remaining)) { fp =>
      withoutSizeLimit {
        complete(
          HttpEntity(
            ContentTypes.`application/octet-stream`,
            fileStream("/users/tiger-macpro/" + fp, 256))
        )
      }
    } ~
      (post & path("exchange")) {
        withoutSizeLimit {
          extractDataBytes { bytes =>
            val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath)))
            onComplete(fut) { _ =>
              complete(s"Save upload file to: $destPath")
            }
          }
        }

      }
  }

  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpEntity._
import java.nio.file._

object FileServer extends App {

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher

  def fileStream(filePath: String, chunkSize: Int) = {
     def loadFile = {
       //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
       val file = Paths.get(filePath)
       FileIO.fromPath(file, chunkSize)
         .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
     }
    limitableByteSource(loadFile)
  }
  val destPath = "/users/tiger-macpro/downloads/A4-1.TIF"
  val route = pathPrefix("file") {
    (get & path("exchange" / Remaining)) { fp =>
      withoutSizeLimit {
        complete(
          HttpEntity(
            ContentTypes.`application/octet-stream`,
            fileStream("/users/tiger-macpro/" + fp, 256))
        )
      }
    } ~
      (post & path("exchange")) {
        withoutSizeLimit {
          extractDataBytes { bytes =>
            val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath)))
            onComplete(fut) { _ =>
              complete(s"Save upload file to: $destPath")
            }
          }
        }

      }
  }

  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

 

 

客户端:

客户端:

 

 

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.limitableByteSource
import akka.http.scaladsl.model._
import java.nio.file._
import akka.util.ByteString
import scala.util._

object FileClient extends App {

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  def downloadFileTo(request: HttpRequest, destPath: String) = {
    val futResp = Http(sys).singleRequest(request)
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
            .onComplete { case _ => println(s"Download file saved to: $destPath") }
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download file!")
        case Failure(err) => println(s"Download failed: ${err.getMessage}")
      }

  }

  val dlFile = "Downloads/readme.txt"
  val downloadText = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile)

  downloadFileTo(downloadText, "/users/tiger-macpro/downloads/sample.txt")
  scala.io.StdIn.readLine()

  val dlFile2 = "Downloads/image.png"
  val downloadText2 = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile2)
  downloadFileTo(downloadText2, "/users/tiger-macpro/downloads/sample.png")
  scala.io.StdIn.readLine()

  def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {
    val futResp = Http(sys).singleRequest(
        request.copy(entity = dataEntity)
      )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
        entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")

      }
  }

  def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = {
    def loadFile = {
      //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
      val file = Paths.get(filePath)
      FileIO.fromPath(file, chunkSize)
        .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
    }
    limitableByteSource(loadFile)
  }

  val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/exchange")
  val textData = HttpEntity(
    ContentTypes.`application/octet-stream`,
    fileStream("/Users/tiger-macpro/downloads/readme.txt",256)
  )

  uploadFile(uploadText,textData)

  scala.io.StdIn.readLine()

  sys.terminate()


}
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.limitableByteSource
import akka.http.scaladsl.model._
import java.nio.file._
import akka.util.ByteString
import scala.util._

object FileClient extends App {

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  def downloadFileTo(request: HttpRequest, destPath: String) = {
    val futResp = Http(sys).singleRequest(request)
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
            .onComplete { case _ => println(s"Download file saved to: $destPath") }
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download file!")
        case Failure(err) => println(s"Download failed: ${err.getMessage}")
      }

  }

  val dlFile = "Downloads/readme.txt"
  val downloadText = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile)

  downloadFileTo(downloadText, "/users/tiger-macpro/downloads/sample.txt")
  scala.io.StdIn.readLine()

  val dlFile2 = "Downloads/image.png"
  val downloadText2 = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile2)
  downloadFileTo(downloadText2, "/users/tiger-macpro/downloads/sample.png")
  scala.io.StdIn.readLine()

  def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {
    val futResp = Http(sys).singleRequest(
        request.copy(entity = dataEntity)
      )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
        entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")

      }
  }

  def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = {
    def loadFile = {
      //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
      val file = Paths.get(filePath)
      FileIO.fromPath(file, chunkSize)
        .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
    }
    limitableByteSource(loadFile)
  }

  val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/exchange")
  val textData = HttpEntity(
    ContentTypes.`application/octet-stream`,
    fileStream("/Users/tiger-macpro/downloads/readme.txt",256)
  )

  uploadFile(uploadText,textData)

  scala.io.StdIn.readLine()

  sys.terminate()


}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

相关文章