diff --git a/src/main/scala/com/twitter/diffy/ApiController.scala b/src/main/scala/com/twitter/diffy/ApiController.scala index 90e5cbfd3..7e0bec69b 100644 --- a/src/main/scala/com/twitter/diffy/ApiController.scala +++ b/src/main/scala/com/twitter/diffy/ApiController.scala @@ -33,7 +33,7 @@ class ApiController @Inject()( JoinedEndpoint: JoinedEndpoint, includeWeights: Boolean, excludeNoise: Boolean - ) = + ) : Map[String, Map[String, Any]] = Map( "endpoint" -> Renderer.endpoint(JoinedEndpoint.endpoint), "fields" -> diff --git a/src/main/scala/com/twitter/diffy/proxy/ClientService.scala b/src/main/scala/com/twitter/diffy/proxy/ClientService.scala index 6d2bca675..a06ce63f3 100644 --- a/src/main/scala/com/twitter/diffy/proxy/ClientService.scala +++ b/src/main/scala/com/twitter/diffy/proxy/ClientService.scala @@ -1,7 +1,7 @@ package com.twitter.diffy.proxy -import com.twitter.finagle.{Addr, Name, Resolver, Service} import com.twitter.finagle.thrift.ThriftClientRequest +import com.twitter.finagle.{Addr, Name, Service} import com.twitter.util.{Time, Var} import org.jboss.netty.handler.codec.http.{HttpRequest, HttpResponse} @@ -42,7 +42,7 @@ case class ThriftService( } } - private[this] def sizeChange(size: Int) { + private[this] def sizeChange(size: Int) : Unit = { changeCount += 1 if (changeCount > 1) { changedAt = Some(Time.now) diff --git a/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala b/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala index b63c17cc5..3b1d5370f 100644 --- a/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala +++ b/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala @@ -25,12 +25,15 @@ object DifferenceProxyModule extends TwitterModule { } object DifferenceProxy { + val AllLabels: Seq[String] = Seq("primary", "secondary", "candidate") + object NoResponseException extends Exception("No responses provided by diffy") val NoResponseExceptionFuture = Future.exception(NoResponseException) val log = Logger(classOf[DifferenceProxy]) } trait DifferenceProxy { + import DifferenceProxy._ type Req diff --git a/src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala b/src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala index c5d4fdd7f..1505f3e7a 100644 --- a/src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala +++ b/src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala @@ -2,15 +2,17 @@ package com.twitter.diffy.proxy import java.net.SocketAddress -import com.twitter.diffy.analysis.{DifferenceAnalyzer, JoinedDifferences, InMemoryDifferenceCollector} +import com.twitter.diffy.analysis.{DifferenceAnalyzer, InMemoryDifferenceCollector, JoinedDifferences} import com.twitter.diffy.lifter.{HttpLifter, Message} import com.twitter.diffy.proxy.DifferenceProxy.NoResponseException -import com.twitter.finagle.{Service, Http, Filter} -import com.twitter.finagle.http.{Status, Response, Method, Request} -import com.twitter.util.{Try, Future} -import org.jboss.netty.handler.codec.http.{HttpResponse, HttpRequest} +import com.twitter.diffy.proxy.http.filter.{CloneHttpRequestFilter, RefineHttpHeadersByLabelFilter} +import com.twitter.finagle.http.{Method, Request, Response, Status} +import com.twitter.finagle.{Filter, Http, Service} +import com.twitter.util.{Future, Try} +import org.jboss.netty.handler.codec.http.{HttpRequest, HttpResponse} object HttpDifferenceProxy { + val okResponse = Future.value(Response(Status.Ok)) val noResponseExceptionFilter = @@ -34,8 +36,12 @@ trait HttpDifferenceProxy extends DifferenceProxy { override type Rep = HttpResponse override type Srv = HttpService + override def serviceFactory(serverset: String, label: String) = - HttpService(Http.newClient(serverset, label).toService) + HttpService( + CloneHttpRequestFilter.apply. + andThen(RefineHttpHeadersByLabelFilter(label, DifferenceProxy.AllLabels)). + andThen(Http.newClient(serverset, label).toService)) override lazy val server = Http.serve( diff --git a/src/main/scala/com/twitter/diffy/proxy/http/filter/CloneHttpRequestFilter.scala b/src/main/scala/com/twitter/diffy/proxy/http/filter/CloneHttpRequestFilter.scala new file mode 100644 index 000000000..7f7c0ad31 --- /dev/null +++ b/src/main/scala/com/twitter/diffy/proxy/http/filter/CloneHttpRequestFilter.scala @@ -0,0 +1,28 @@ +package com.twitter.diffy.proxy.http.filter + +import com.twitter.finagle.Filter +import com.twitter.util.Future +import org.jboss.netty.handler.codec.http.{DefaultHttpRequest, HttpHeaders, HttpRequest, HttpResponse} + +import scala.collection.JavaConverters._ + +object CloneHttpRequestFilter { + + type Req = HttpRequest + type Res = HttpResponse + + private def mkCloneRequest(reqIn:Req, filter: (Req => Future[Res])):Future[Res] = { + def copyHeader(headers:HttpHeaders)(k: String, v:String) : Unit = { headers.add(k, v)} + + val reqOut: DefaultHttpRequest = new DefaultHttpRequest(reqIn.getProtocolVersion, reqIn.getMethod, reqIn.getUri) + reqOut.setChunked(reqIn.isChunked) + reqOut.setContent(reqIn.getContent) + + val headers = for {entry <- reqIn.headers().asScala} yield (entry.getKey, entry.getValue) + headers.foreach((copyHeader(reqOut.headers)_).tupled) + + filter(reqOut) + } + + def apply(): Filter[Req, Res, Req, Res] = Filter.mk(mkCloneRequest) +} diff --git a/src/main/scala/com/twitter/diffy/proxy/http/filter/HeaderEffect.scala b/src/main/scala/com/twitter/diffy/proxy/http/filter/HeaderEffect.scala new file mode 100644 index 000000000..dfef107e7 --- /dev/null +++ b/src/main/scala/com/twitter/diffy/proxy/http/filter/HeaderEffect.scala @@ -0,0 +1,32 @@ +package com.twitter.diffy.proxy.http.filter + +import org.jboss.netty.handler.codec.http.HttpHeaders + +sealed trait HeaderEffect { + def apply(headers:HttpHeaders) : Unit +} + +case class Rewrite(oldName:String, newName:String) extends HeaderEffect { + def apply(headers:HttpHeaders): Unit ={ + val temp = headers.get(oldName) + headers.remove(oldName) + headers.set(newName, temp) + } +} + +case class Remove( name:String) extends HeaderEffect { + def apply(headers:HttpHeaders): Unit = { + headers.remove(name) + } +} + +object HeaderEffect { + def rewrite(oldName: String, newName:String): HeaderEffect = { + if (newName.isEmpty){ + Remove(oldName) + } else { + Rewrite(oldName,newName) + } + } + def remove(oldName:String):HeaderEffect = Remove(oldName) +} diff --git a/src/main/scala/com/twitter/diffy/proxy/http/filter/RefineHttpHeadersByLabelFilter.scala b/src/main/scala/com/twitter/diffy/proxy/http/filter/RefineHttpHeadersByLabelFilter.scala new file mode 100644 index 000000000..ddcf63405 --- /dev/null +++ b/src/main/scala/com/twitter/diffy/proxy/http/filter/RefineHttpHeadersByLabelFilter.scala @@ -0,0 +1,33 @@ +package com.twitter.diffy.proxy.http.filter + +import com.twitter.finagle.Filter +import org.jboss.netty.handler.codec.http.{HttpRequest, HttpResponse} + +object RefineHttpHeadersByLabelFilter { + + def rewriteRule(prefix:String):HeaderRule = { + val prefixLength = prefix.length + def go(name:String) = { + if (name.startsWith(prefix)) { + Some(HeaderEffect.rewrite(name, name.substring(prefixLength))) + } else None + } + go + } + + def removeRule(prefix:String): HeaderRule = { + def go(name:String) = { + if (name.startsWith(prefix)) Some(HeaderEffect.remove(name)) + else None + } + go + } + + def apply(label: String, allLabels: Seq[String]): Filter[HttpRequest, HttpResponse, HttpRequest, HttpResponse] = { + val (inclusions, exclusion) = allLabels.partition(_.equalsIgnoreCase(label)) + val rules = + inclusions.map(_ + "_").map(rewriteRule) ++ + exclusion.map(_+ "_").map(removeRule) + RewriteHttpHeadersFilter(rules) + } +} diff --git a/src/main/scala/com/twitter/diffy/proxy/http/filter/RewriteHttpHeadersFilter.scala b/src/main/scala/com/twitter/diffy/proxy/http/filter/RewriteHttpHeadersFilter.scala new file mode 100644 index 000000000..e8f6c6afe --- /dev/null +++ b/src/main/scala/com/twitter/diffy/proxy/http/filter/RewriteHttpHeadersFilter.scala @@ -0,0 +1,29 @@ +package com.twitter.diffy.proxy.http.filter + +import com.twitter.finagle.Filter +import com.twitter.util.Future +import org.jboss.netty.handler.codec.http.{HttpHeaders, HttpResponse, HttpRequest} +import scala.collection.JavaConverters._ + +object RewriteHttpHeadersFilter { + + def apply(rules:Seq[HeaderRule]) : Filter[HttpRequest, HttpResponse, HttpRequest, HttpResponse] = { + def filter(req: HttpRequest):HttpRequest = { + val headers: HttpHeaders = req.headers + val effects = for { + name <- headers.names().asScala + effect <- rules.flatMap(_(name)).headOption + } yield { + effect + } + effects.foreach(_ (headers)) + req + } + + def go(req: HttpRequest, next: (HttpRequest => Future[HttpResponse])): Future[HttpResponse] ={ + next(filter(req)) + } + + Filter.mk(go) + } +} diff --git a/src/main/scala/com/twitter/diffy/proxy/http/filter/package.scala b/src/main/scala/com/twitter/diffy/proxy/http/filter/package.scala new file mode 100644 index 000000000..ba1f0a4e3 --- /dev/null +++ b/src/main/scala/com/twitter/diffy/proxy/http/filter/package.scala @@ -0,0 +1,5 @@ +package com.twitter.diffy.proxy.http + +package object filter { + type HeaderRule = String => Option[HeaderEffect] +} diff --git a/src/test/scala/com/twitter/diffy/proxy/http/filter/CloneHttpRequestFilterSpec.scala b/src/test/scala/com/twitter/diffy/proxy/http/filter/CloneHttpRequestFilterSpec.scala new file mode 100644 index 000000000..56eb525e0 --- /dev/null +++ b/src/test/scala/com/twitter/diffy/proxy/http/filter/CloneHttpRequestFilterSpec.scala @@ -0,0 +1,53 @@ +package com.twitter.diffy.proxy.http.filter; + +import com.twitter.diffy.ParentSpec +import com.twitter.diffy.util.TwitterFutures +import com.twitter.finagle.http.{Request, Response} +import com.twitter.finagle.{Filter, Service} +import com.twitter.util.Future +import org.jboss.netty.handler.codec.http._ +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class CloneHttpRequestFilterSpec extends ParentSpec with TwitterFutures{ + describe("CloneHttpRequestFilter"){ + val subject: Filter[HttpRequest, HttpResponse, HttpRequest, HttpResponse] = CloneHttpRequestFilter.apply() + + def mutateHeader(req: HttpRequest): Future[HttpResponse] = { + req.headers().add("mutation", "test") + Future.apply(Response(req)) + } + + val mutateHeaderService =subject.andThen(Service.mk(mutateHeader)) + + + describe("recieving a request with no headers"){ + def request:Request={ Request(HttpVersion.HTTP_1_1, HttpMethod.GET, "/") } + + it("must prevent services which mutate the output request header from affecting the input request"){ + val input: Request = request + whenReady(mutateHeaderService(input)){ _ => input.headers().names() mustNot contain("mutation") } + } + it("must preserve http method"){ + val input = request + whenReady(mutateHeaderService(input)){ _ => input.method mustBe HttpMethod.GET } + } + it("must preserve http version"){ + val input = request + whenReady(mutateHeaderService(input)){_ => input.version mustBe HttpVersion.HTTP_1_1 } + } + + it("must preserve content buffer"){ + val input = request + val expected = request.content.array() + whenReady(mutateHeaderService(input)){_ => input.content.array() mustBe expected } + } + it("must preserve chunkiness"){ + val input = request + whenReady(mutateHeaderService(input)){_ => input.isChunked() mustBe false} + + } + } + } +} diff --git a/src/test/scala/com/twitter/diffy/proxy/http/filter/RefineHttpHeadersByLabelSpec.scala b/src/test/scala/com/twitter/diffy/proxy/http/filter/RefineHttpHeadersByLabelSpec.scala new file mode 100644 index 000000000..5cbfc65b3 --- /dev/null +++ b/src/test/scala/com/twitter/diffy/proxy/http/filter/RefineHttpHeadersByLabelSpec.scala @@ -0,0 +1,80 @@ +package com.twitter.diffy.proxy.http.filter + +import com.twitter.diffy.ParentSpec +import com.twitter.diffy.util.TwitterFutures +import com.twitter.finagle.http.{Request, Response} +import com.twitter.finagle.{Filter, Service} +import com.twitter.util.Future +import org.jboss.netty.handler.codec.http.{HttpHeaders, HttpRequest, HttpResponse} +import org.junit.runner.RunWith +import org.scalatest.FunSpec +import org.scalatest.junit.JUnitRunner + +import scala.collection.JavaConverters._ + +trait HeaderTransformations extends ParentSpec with TwitterFutures { this: FunSpec => + + def returnHeaders(req: HttpRequest): Future[HttpResponse] = { + val response: Response = Response(req) + response.headers().add(req.headers()) + Future.apply(response) + } + + def anEchoServiceForHeaders(subject: Filter[HttpRequest, HttpResponse, HttpRequest, HttpResponse], + inputHeaders: Map[String,String], + expectedHeaders: Map[String,String]): Unit ={ + val service = subject.andThen(Service.mk(returnHeaders)) + + it ("must match headers"){ + val req1: HttpRequest = Request("/") + val addHeader = (k:String, v:String) => req1.headers().add(k, v) + inputHeaders.foreach( addHeader.tupled ) + + def headersToMap(httpHeaders:HttpHeaders): Map[String,String] = { + httpHeaders.iterator().asScala.map( e=>(e.getKey,e.getValue )).toMap + } + whenReady(service(req1).map( _.headers ).map(headersToMap))( _ mustBe expectedHeaders ) + } + } + + def aFilterThatTargetsLabel(label:String, labelEx1:String, labelEx2:String) : Unit ={ + describe(s"given a $label filter"){ + val subject: Filter[HttpRequest, HttpResponse, HttpRequest, HttpResponse] = + RefineHttpHeadersByLabelFilter(label, List(label, labelEx1, labelEx2)) + + describe("given empty headers"){ + it must behave like anEchoServiceForHeaders(subject, Map.empty, Map.empty) + } + describe("given a set of headers"){ + val headers=Map(("A", "B"), ("C-c", "D-d")) + it must behave like anEchoServiceForHeaders(subject, headers, headers) + } + describe(s"given an exact $label header"){ + it must behave like anEchoServiceForHeaders(subject, Map( (s"$label", "X")), Map((s"$label", "X"))) + } + describe(s"given a $label prefixed header"){ + it must behave like anEchoServiceForHeaders(subject, Map( (s"${label}_X", "X")), Map(("X", "X"))) + } + describe(s"given a $label prefixed header with no suffix"){ + it must behave like anEchoServiceForHeaders(subject, Map( (s"${label}_", "X")), Map.empty) + } + + describe(s"given a $labelEx1 prefixed header"){ + it must behave like anEchoServiceForHeaders(subject, Map( (s"${labelEx1}_X", "X")), Map.empty) + } + describe(s"given a $labelEx2 prefixed header"){ + it must behave like anEchoServiceForHeaders(subject, Map( (s"${labelEx2}_Y", "Y")), Map.empty) + } + + } + } + +} +@RunWith(classOf[JUnitRunner]) +class RefineHttpHeadersByLabelSpec extends ParentSpec with TwitterFutures with HeaderTransformations { + describe("RefineHttpHeadersByLabelFilter"){ + it must behave like aFilterThatTargetsLabel("primary", "candidate", "secondary") + it must behave like aFilterThatTargetsLabel("candidate", "primary", "secondary") + it must behave like aFilterThatTargetsLabel("secondary", "primary", "candidate") + } +} diff --git a/src/test/scala/com/twitter/diffy/util/TwitterFutures.scala b/src/test/scala/com/twitter/diffy/util/TwitterFutures.scala new file mode 100644 index 000000000..cbb00d461 --- /dev/null +++ b/src/test/scala/com/twitter/diffy/util/TwitterFutures.scala @@ -0,0 +1,21 @@ +package com.twitter.diffy.util + +import com.twitter.util.{Return, Throw} +import org.scalatest.concurrent.Futures + +trait TwitterFutures extends Futures { + + import scala.language.implicitConversions + + implicit def convertTwitterFuture[T](twitterFuture: com.twitter.util.Future[T]): FutureConcept[T] = + new FutureConcept[T] { + override def eitherValue: Option[Either[Throwable, T]] = { + twitterFuture.poll.map { + case Return(o) => Right(o) + case Throw(e) => Left(e) + } + } + override def isCanceled: Boolean = false + override def isExpired: Boolean = false + } +}