在客户端可以用上面的办法提供自己的用户地点新闻,在客户端可以用上边的点子提供自己的用户地点音讯

 
 当大家把Akka-http作为数据库数据交流工具时,数据是以Source[ROW,_]花样存放在Entity里的。很多时候除数量之外咱们兴许需求进行部分叠加的音信传递如对数据的切切实实处理方式等。大家得以由此Akka-http的raw-header来达成附加自定义信息的传递,那项功作用够经过Akka-http提供的raw-header筛选功能来促成。在客户端大家把附加新闻放在HttpRequest的raw
header里,如下:

 
 当我们把Akka-http作为数据库数据沟通工具时,数据是以Source[ROW,_]款式存放在Entity里的。很多时候除数量之外大家也许须要开展局地叠加的信息传递如对数据的有血有肉处理格局等。大家得以因此Akka-http的raw-header来完结附加自定义音信的传递,那项功用可以透过Akka-http提供的raw-header筛选成效来完成。在客户端大家把附加消息放在HttpRequest的raw
header里,如下:

  import akka.http.scaladsl.model.headers._
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
                   .addHeader(RawHeader("action","insert:county"))
  import akka.http.scaladsl.model.headers._
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
                   .addHeader(RawHeader("action","insert:county"))

在那边客户端注脚上传数据应插入county表。服务端可以像上边那样获取这项音讯:

在那里客户端注脚上传数据应插入county表。服务端可以像下边这样获取那项信息:

             optionalHeaderValueByName("action") {
                case Some(action) =>
                  entity(asSourceOf[County]) { source =>
                    val futofNames: Future[List[String]] =
                      source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                    complete(s"Received rows for $action")
                  }
                case None => complete ("No action specified!")
              }
             optionalHeaderValueByName("action") {
                case Some(action) =>
                  entity(asSourceOf[County]) { source =>
                    val futofNames: Future[List[String]] =
                      source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                    complete(s"Received rows for $action")
                  }
                case None => complete ("No action specified!")
              }

Akka-http通过Credential类的Directive提供了authentication和authorization。在客户端可以用上面的措施提供温馨的用户身份新闻:

Akka-http通过Credential类的Directive提供了authentication和authorization。在客户端可以用上边的法子提供自己的用户身份新闻:

  import akka.http.scaladsl.model.headers._
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
    .addHeader(RawHeader("action","insert:county"))
    .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))
  import akka.http.scaladsl.model.headers._
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
    .addHeader(RawHeader("action","insert:county"))
    .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))

服务端对客户端的身份验证处理格局如下:

服务端对客户端的身份验证处理措施如下:

  import akka.http.scaladsl.server.directives.Credentials
  def myUserPassAuthenticator(credentials: Credentials): Future[Option[User]] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    credentials match {
      case p @ Credentials.Provided(id) =>
        Future {
          // potentially
          if (p.verify("p4ssw0rd")) Some(User(id))
          else None
        }
      case _ => Future.successful(None)
    }
  }

  case class User(name: String)
  val validUsers = Set("john","peter","tiger","susan")
  def hasAdminPermissions(user: User): Future[Boolean] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    Future.successful(validUsers.contains(user.name))
  }
  import akka.http.scaladsl.server.directives.Credentials
  def myUserPassAuthenticator(credentials: Credentials): Future[Option[User]] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    credentials match {
      case p @ Credentials.Provided(id) =>
        Future {
          // potentially
          if (p.verify("p4ssw0rd")) Some(User(id))
          else None
        }
      case _ => Future.successful(None)
    }
  }

  case class User(name: String)
  val validUsers = Set("john","peter","tiger","susan")
  def hasAdminPermissions(user: User): Future[Boolean] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    Future.successful(validUsers.contains(user.name))
  }

上边是Credential-Directive的使用方法:

下边是Credential-Directive的采纳格局:

         authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>
            authorizeAsync(_ => hasPermissions(user)) {
              withoutSizeLimit {
                handleExceptions(postExceptionHandler) {
                  optionalHeaderValueByName("action") {
                    case Some(action) =>
                      entity(asSourceOf[County]) { source =>
                        val futofNames: Future[List[String]] =
                          source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                        complete(s"Received rows for $action sent from $user")
                      }
                    case None => complete(s"$user did not specify action for uploaded rows!")
                  }
                }
              }
            }
          }
         authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>
            authorizeAsync(_ => hasPermissions(user)) {
              withoutSizeLimit {
                handleExceptions(postExceptionHandler) {
                  optionalHeaderValueByName("action") {
                    case Some(action) =>
                      entity(asSourceOf[County]) { source =>
                        val futofNames: Future[List[String]] =
                          source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                        complete(s"Received rows for $action sent from $user")
                      }
                    case None => complete(s"$user did not specify action for uploaded rows!")
                  }
                }
              }
            }
          }

下边是这次啄磨的以身作则代码:

上面是此次商讨的演示代码:

客户端:

客户端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.model._
import spray.json._

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  implicit val countyFormat = jsonFormat2(County)
}

object HttpClientDemo extends App {
  import Converters._

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()

  import akka.util.ByteString
  import akka.http.scaladsl.model.HttpEntity.limitableByteSource

  val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}
  def countyToByteString(c: County) = {
    ByteString(c.toJson.toString)
  }
  val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)

  val rowBytes = limitableByteSource(source via flowCountyToByteString)

  import akka.http.scaladsl.model.headers._
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
    .addHeader(RawHeader("action","insert:county"))
    .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))

  val data = HttpEntity(
    ContentTypes.`application/json`,
    rowBytes
  )

  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {
    val futResp = Http(sys).singleRequest(
      request.copy(entity = dataEntity)
    )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")

      }
  }


  uploadRows(request,data)

  scala.io.StdIn.readLine()

  sys.terminate()

}
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.model._
import spray.json._

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  implicit val countyFormat = jsonFormat2(County)
}

object HttpClientDemo extends App {
  import Converters._

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()

  import akka.util.ByteString
  import akka.http.scaladsl.model.HttpEntity.limitableByteSource

  val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}
  def countyToByteString(c: County) = {
    ByteString(c.toJson.toString)
  }
  val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)

  val rowBytes = limitableByteSource(source via flowCountyToByteString)

  import akka.http.scaladsl.model.headers._
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
    .addHeader(RawHeader("action","insert:county"))
    .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))

  val data = HttpEntity(
    ContentTypes.`application/json`,
    rowBytes
  )

  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {
    val futResp = Http(sys).singleRequest(
      request.copy(entity = dataEntity)
    )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")

      }
  }


  uploadRows(request,data)

  scala.io.StdIn.readLine()

  sys.terminate()

}

服务端:

服务端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import scala.concurrent._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }
  implicit val countyFormat = jsonFormat2(County)
}

object HttpServerDemo extends App {

  import Converters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher


  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = false)

  def postExceptionHandler: ExceptionHandler =
    ExceptionHandler {
      case _: RuntimeException =>
        extractRequest { req =>
          req.discardEntityBytes()
          complete((StatusCodes.InternalServerError.intValue, "Upload Failed!"))
        }
    }

  import akka.http.scaladsl.server.directives.Credentials
  def userPassAuthenticator(credentials: Credentials): Future[Option[User]] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    credentials match {
      case p @ Credentials.Provided(id) =>
        Future {
          // potentially
          if (p.verify("p4ssw0rd")) Some(User(id))
          else None
        }
      case _ => Future.successful(None)
    }
  }

  case class User(name: String)
  val validUsers = Set("john","peter","tiger","susan")
  def hasPermissions(user: User): Future[Boolean] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    Future.successful(validUsers.contains(user.name))
  }

  val route =
    path("rows") {
      get {
        complete {
          source
        }
      } ~
        post {
          authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>
            authorizeAsync(_ => hasPermissions(user)) {
              withoutSizeLimit {
                handleExceptions(postExceptionHandler) {
                  optionalHeaderValueByName("action") {
                    case Some(action) =>
                      entity(asSourceOf[County]) { source =>
                        val futofNames: Future[List[String]] =
                          source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                        complete(s"Received rows for $action sent from $user")
                      }
                    case None => complete(s"$user did not specify action for uploaded rows!")
                  }
                }
              }
            }
          }
        }
    }

  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import scala.concurrent._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }
  implicit val countyFormat = jsonFormat2(County)
}

object HttpServerDemo extends App {

  import Converters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher


  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = false)

  def postExceptionHandler: ExceptionHandler =
    ExceptionHandler {
      case _: RuntimeException =>
        extractRequest { req =>
          req.discardEntityBytes()
          complete((StatusCodes.InternalServerError.intValue, "Upload Failed!"))
        }
    }

  import akka.http.scaladsl.server.directives.Credentials
  def userPassAuthenticator(credentials: Credentials): Future[Option[User]] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    credentials match {
      case p @ Credentials.Provided(id) =>
        Future {
          // potentially
          if (p.verify("p4ssw0rd")) Some(User(id))
          else None
        }
      case _ => Future.successful(None)
    }
  }

  case class User(name: String)
  val validUsers = Set("john","peter","tiger","susan")
  def hasPermissions(user: User): Future[Boolean] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    Future.successful(validUsers.contains(user.name))
  }

  val route =
    path("rows") {
      get {
        complete {
          source
        }
      } ~
        post {
          authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>
            authorizeAsync(_ => hasPermissions(user)) {
              withoutSizeLimit {
                handleExceptions(postExceptionHandler) {
                  optionalHeaderValueByName("action") {
                    case Some(action) =>
                      entity(asSourceOf[County]) { source =>
                        val futofNames: Future[List[String]] =
                          source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                        complete(s"Received rows for $action sent from $user")
                      }
                    case None => complete(s"$user did not specify action for uploaded rows!")
                  }
                }
              }
            }
          }
        }
    }

  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

相关文章