在客户端能够用上面包车型地铁艺术提供温馨的用户身份音讯,在客户端能够用下边包车型地铁不二诀要提供温馨的用户身份音信

 
 当我们把Akka-http作为数据库数据交流工具时,数据是以Source[ROW,_]款式存放在Entity里的。很多时候除数量之外我们兴许供给开始展览局地叠加的音信传送如对数据的切实处理格局等。我们能够通过Akka-http的raw-header来完毕附加自定义音讯的传递,那项功用能够经过Akka-http提供的raw-header筛选功用来得以实现。在客户端我们把附加新闻放在HttpRequest的raw
header里,如下:

当大家把Akka-http作为数据库数据沟通工具时,数据是以Source[ROW,_]花样存放在Entity里的。很多时候除数量之外大家恐怕须要展开一些增大的消息传送如对数据的实际处理情势等。大家能够通过Akka-http的raw-header来达成附加自定义新闻的传递,这项作用能够透过Akka-http提供的raw-header筛选作用来促成。在客户端大家把附加音信放在HttpRequest的raw
header里,如下:

  import akka.http.scaladsl.model.headers._
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
                   .addHeader(RawHeader("action","insert:county"))
  import akka.http.scaladsl.model.headers._  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")                   .addHeader(RawHeader("action","insert:county"))

在那里客户端阐明上传数据应插入county表。服务端能够像上面那样获取那项信息:

在此处客户端评释上传数据应插入county表。服务端能够像上边那样获取那项新闻:

             optionalHeaderValueByName("action") {
                case Some(action) =>
                  entity(asSourceOf[County]) { source =>
                    val futofNames: Future[List[String]] =
                      source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                    complete(s"Received rows for $action")
                  }
                case None => complete ("No action specified!")
              }
             optionalHeaderValueByName("action") {                case Some =>                  entity(asSourceOf[County]) { source =>                    val futofNames: Future[List[String]] =                      source.runFold(List[String]("")) => acc ++ List                    complete(s"Received rows for $action")                  }                case None => complete ("No action specified!")              }

Akka-http通过Credential类的Directive提供了authentication和authorization。在客户端能够用上面包车型客车办法提供自个儿的用户身份消息:

Akka-http通过Credential类的Directive提供了authentication和authorization。在客户端能够用上边包车型大巴艺术提供本身的用户地方新闻:

  import akka.http.scaladsl.model.headers._
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
    .addHeader(RawHeader("action","insert:county"))
    .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))
  import akka.http.scaladsl.model.headers._  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")    .addHeader(RawHeader("action","insert:county"))    .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))

服务端对客户端的身份验证处理方法如下:

服务端对客户端的身份验证处理措施如下:

  import akka.http.scaladsl.server.directives.Credentials
  def myUserPassAuthenticator(credentials: Credentials): Future[Option[User]] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    credentials match {
      case p @ Credentials.Provided(id) =>
        Future {
          // potentially
          if (p.verify("p4ssw0rd")) Some(User(id))
          else None
        }
      case _ => Future.successful(None)
    }
  }

  case class User(name: String)
  val validUsers = Set("john","peter","tiger","susan")
  def hasAdminPermissions(user: User): Future[Boolean] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    Future.successful(validUsers.contains(user.name))
  }
  import akka.http.scaladsl.server.directives.Credentials  def myUserPassAuthenticator(credentials: Credentials): Future[Option[User]] = {    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")    credentials match {      case p @ Credentials.Provided =>        Future {          // potentially          if (p.verify("p4ssw0rd")) Some          else None        }      case _ => Future.successful    }  }  case class User(name: String)  val validUsers = Set("john","peter","tiger","susan")  def hasAdminPermissions(user: User): Future[Boolean] = {    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")    Future.successful(validUsers.contains(user.name))  }

下边是Credential-Directive的使用办法:

下边是Credential-Directive的选拔方法:

         authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>
            authorizeAsync(_ => hasPermissions(user)) {
              withoutSizeLimit {
                handleExceptions(postExceptionHandler) {
                  optionalHeaderValueByName("action") {
                    case Some(action) =>
                      entity(asSourceOf[County]) { source =>
                        val futofNames: Future[List[String]] =
                          source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                        complete(s"Received rows for $action sent from $user")
                      }
                    case None => complete(s"$user did not specify action for uploaded rows!")
                  }
                }
              }
            }
          }
         authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>            authorizeAsync(_ => hasPermissions {              withoutSizeLimit {                handleExceptions(postExceptionHandler) {                  optionalHeaderValueByName("action") {                    case Some =>                      entity(asSourceOf[County]) { source =>                        val futofNames: Future[List[String]] =                          source.runFold(List[String]("")) => acc ++ List                        complete(s"Received rows for $action sent from $user")                      }                    case None => complete(s"$user did not specify action for uploaded rows!")                  }                }              }            }          }

上边是此番钻探的言传身教代码:

上面是本次探讨的以身作则代码:

客户端:

客户端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.model._
import spray.json._

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  implicit val countyFormat = jsonFormat2(County)
}

object HttpClientDemo extends App {
  import Converters._

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()

  import akka.util.ByteString
  import akka.http.scaladsl.model.HttpEntity.limitableByteSource

  val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}
  def countyToByteString(c: County) = {
    ByteString(c.toJson.toString)
  }
  val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)

  val rowBytes = limitableByteSource(source via flowCountyToByteString)

  import akka.http.scaladsl.model.headers._
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
    .addHeader(RawHeader("action","insert:county"))
    .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))

  val data = HttpEntity(
    ContentTypes.`application/json`,
    rowBytes
  )

  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {
    val futResp = Http(sys).singleRequest(
      request.copy(entity = dataEntity)
    )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")

      }
  }


  uploadRows(request,data)

  scala.io.StdIn.readLine()

  sys.terminate()

}
import akka.actor._import akka.stream._import akka.stream.scaladsl._import akka.http.scaladsl.Httpimport scala.util._import akka._import akka.http.scaladsl.common._import spray.json.DefaultJsonProtocolimport akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupportimport akka.http.scaladsl.common.EntityStreamingSupportimport akka.http.scaladsl.model._import spray.json._trait MyFormats extends SprayJsonSupport with DefaultJsonProtocolobject Converters extends MyFormats {  case class County(id: Int, name: String)  implicit val countyFormat = jsonFormat2}object HttpClientDemo extends App {  import Converters._  implicit val sys = ActorSystem("ClientSys")  implicit val mat = ActorMaterializer()  implicit val ec = sys.dispatcher  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()  import akka.util.ByteString  import akka.http.scaladsl.model.HttpEntity.limitableByteSource  val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}  def countyToByteString(c: County) = {    ByteString(c.toJson.toString)  }  val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)  val rowBytes = limitableByteSource(source via flowCountyToByteString)  import akka.http.scaladsl.model.headers._  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")    .addHeader(RawHeader("action","insert:county"))    .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))  val data = HttpEntity(    ContentTypes.`application/json`,    rowBytes  )  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {    val futResp = Http.singleRequest(      request.copy(entity = dataEntity)    )    futResp      .andThen {        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>          entity.dataBytes.map(_.utf8String).runForeach        case Success(r@HttpResponse(code, _, _, _)) =>          println(s"Upload request failed, response code: $code")          r.discardEntityBytes()        case Success => println("Unable to Upload file!")        case Failure => println(s"Upload failed: ${err.getMessage}")      }  }  uploadRows(request,data)  scala.io.StdIn.readLine()  sys.terminate()}

服务端:

服务端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import scala.concurrent._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }
  implicit val countyFormat = jsonFormat2(County)
}

object HttpServerDemo extends App {

  import Converters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher


  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = false)

  def postExceptionHandler: ExceptionHandler =
    ExceptionHandler {
      case _: RuntimeException =>
        extractRequest { req =>
          req.discardEntityBytes()
          complete((StatusCodes.InternalServerError.intValue, "Upload Failed!"))
        }
    }

  import akka.http.scaladsl.server.directives.Credentials
  def userPassAuthenticator(credentials: Credentials): Future[Option[User]] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    credentials match {
      case p @ Credentials.Provided(id) =>
        Future {
          // potentially
          if (p.verify("p4ssw0rd")) Some(User(id))
          else None
        }
      case _ => Future.successful(None)
    }
  }

  case class User(name: String)
  val validUsers = Set("john","peter","tiger","susan")
  def hasPermissions(user: User): Future[Boolean] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    Future.successful(validUsers.contains(user.name))
  }

  val route =
    path("rows") {
      get {
        complete {
          source
        }
      } ~
        post {
          authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>
            authorizeAsync(_ => hasPermissions(user)) {
              withoutSizeLimit {
                handleExceptions(postExceptionHandler) {
                  optionalHeaderValueByName("action") {
                    case Some(action) =>
                      entity(asSourceOf[County]) { source =>
                        val futofNames: Future[List[String]] =
                          source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                        complete(s"Received rows for $action sent from $user")
                      }
                    case None => complete(s"$user did not specify action for uploaded rows!")
                  }
                }
              }
            }
          }
        }
    }

  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}
import akka.actor._import akka.stream._import akka.stream.scaladsl._import akka.http.scaladsl.Httpimport akka._import akka.http.scaladsl.common._import spray.json.DefaultJsonProtocolimport akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupportimport scala.concurrent._import akka.http.scaladsl.server._import akka.http.scaladsl.server.Directives._import akka.http.scaladsl.model._trait MyFormats extends SprayJsonSupport with DefaultJsonProtocolobject Converters extends MyFormats {  case class County(id: Int, name: String)  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }  implicit val countyFormat = jsonFormat2}object HttpServerDemo extends App {  import Converters._  implicit val httpSys = ActorSystem("httpSystem")  implicit val httpMat = ActorMaterializer()  implicit val httpEC = httpSys.dispatcher  implicit val jsonStreamingSupport = EntityStreamingSupport.json()    .withParallelMarshalling(parallelism = 8, unordered = false)  def postExceptionHandler: ExceptionHandler =    ExceptionHandler {      case _: RuntimeException =>        extractRequest { req =>          req.discardEntityBytes()          complete((StatusCodes.InternalServerError.intValue, "Upload Failed!"))        }    }  import akka.http.scaladsl.server.directives.Credentials  def userPassAuthenticator(credentials: Credentials): Future[Option[User]] = {    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")    credentials match {      case p @ Credentials.Provided =>        Future {          // potentially          if (p.verify("p4ssw0rd")) Some          else None        }      case _ => Future.successful    }  }  case class User(name: String)  val validUsers = Set("john","peter","tiger","susan")  def hasPermissions(user: User): Future[Boolean] = {    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")    Future.successful(validUsers.contains(user.name))  }  val route =    path("rows") {      get {        complete {          source        }      } ~        post {          authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>            authorizeAsync(_ => hasPermissions {              withoutSizeLimit {                handleExceptions(postExceptionHandler) {                  optionalHeaderValueByName("action") {                    case Some =>                      entity(asSourceOf[County]) { source =>                        val futofNames: Future[List[String]] =                          source.runFold(List[String]("")) => acc ++ List                        complete(s"Received rows for $action sent from $user")                      }                    case None => complete(s"$user did not specify action for uploaded rows!")                  }                }              }            }          }        }    }  val (port, host) = (8011,"localhost")  val bindingFuture = Http().bindAndHandle(route,host,port)  println(s"Server running at $host $port. Press any key to exit ...")  scala.io.StdIn.readLine()  bindingFuture.flatMap(_.unbind    .onComplete(_ => httpSys.terminate}

 

 

 

 

 

 

 

 

 

 

相关文章