Skip to content

Commit

Permalink
add process library with extracted and new functions
Browse files Browse the repository at this point in the history
  • Loading branch information
cvogt committed Jun 16, 2017
1 parent c65d21a commit b5194aa
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 112 deletions.
2 changes: 1 addition & 1 deletion build/build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Build(val context: Context) extends Shared with Scalariform with PublishLo
super.dependencies ++ Resolver(mavenCentral).bind(
MavenDependency("org.eclipse.jgit", "org.eclipse.jgit", "4.2.0.201601211800-r"),
ScalaDependency("org.scala-lang.modules","scala-xml",constants.scalaXmlVersion)
) :+ libraries.cbt.reflect :+ libraries.cbt.eval
) :+ libraries.cbt.reflect :+ libraries.cbt.eval :+ libraries.cbt.process
}

override def sources = Seq(
Expand Down
10 changes: 10 additions & 0 deletions libraries/process/build/build.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package cbt_build.process
import cbt._
import cbt_internal._
class Build(val context: Context) extends Library{
override def inceptionYear = 2017
override def description = "helpers for process calls"
override def dependencies = super.dependencies ++ Resolver(mavenCentral).bind(
MavenDependency( "net.java.dev.jna", "jna-platform", "4.4.0" )
) :+ libraries.cbt.common_1
}
5 changes: 5 additions & 0 deletions libraries/process/build/build/build.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package cbt_build.reflect.build
import cbt._
class Build(val context: Context) extends BuildBuild with CbtInternal{
override def dependencies = super.dependencies :+ cbtInternal.library
}
175 changes: 175 additions & 0 deletions libraries/process/process.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package cbt.process
import cbt.ExitCode
import java.io._

object `package` extends Module

trait Module {
def runMainForked(
className: String,
args: Seq[String],
classpath: String,
directory: Option[File],
outErrIn: Option[( OutputStream, OutputStream, InputStream )]
): ( Int, () => ExitCode, () => ExitCode ) = {
// FIXME: Windows support
val java_exe = new File( System.getProperty( "java.home" ) + "/bin/java" )
runWithIO(
java_exe.toString +: "-cp" +: classpath +: className +: args,
directory,
outErrIn
)
}

def runWithIO(
commandLine: Seq[String],
directory: Option[File],
outErrIn: Option[( OutputStream, OutputStream, InputStream )]
): ( Int, () => ExitCode, () => ExitCode ) = {
val pb = new ProcessBuilder( commandLine: _* )
outErrIn.map {
case ( out, err, in ) =>
val process = directory.map( pb.directory( _ ) ).getOrElse( pb )
.redirectInput( ProcessBuilder.Redirect.PIPE )
.redirectOutput( ProcessBuilder.Redirect.PIPE )
.redirectError( ProcessBuilder.Redirect.PIPE )
.start

(
processId( process ),
() => {
val lock = new AnyRef

val t1 = asyncPipeCharacterStreamSyncLines( process.getErrorStream, err, lock )
val t2 = asyncPipeCharacterStreamSyncLines( process.getInputStream, out, lock )
val t3 = asyncPipeCharacterStream( System.in, process.getOutputStream, process.isAlive )

t1.start
t2.start
t3.start

t1.join
t2.join

val e = process.waitFor
System.err.println( scala.Console.RESET + "Please press ENTER to continue..." )
t3.join
ExitCode( e )
},
() => {
process.destroy
Thread.sleep( 20 )
ExitCode( process.destroyForcibly.waitFor )
}
)
}.getOrElse {
val process = pb.inheritIO.start
(
processId( process ),
() => ExitCode( process.waitFor ),
() => {
process.destroy
Thread.sleep( 20 )
ExitCode( process.destroyForcibly.waitFor )
}
)
}
}

private def accessField( cls: Class[_], field: String ): java.lang.reflect.Field = {
val f = cls.getDeclaredField( field )
f.setAccessible( true )
f
}

import com.sun.jna.{ Library, Native }
private trait CLibrary extends Library {
def getpid: Int
}
private val CLibraryInstance: CLibrary = Native.loadLibrary( "c", classOf[CLibrary] ).asInstanceOf[CLibrary]

def currentProcessId: Int = {
if ( Option( System.getProperty( "os.name" ) ).exists( _.startsWith( "Windows" ) ) ) {
com.sun.jna.platform.win32.Kernel32.INSTANCE.GetCurrentProcessId
} else {
CLibraryInstance.getpid
}
}

/** process id of given Process */
def processId( process: Process ): Int = {
val clsName = process.getClass.getName
if ( clsName == "java.lang.UNIXProcess" ) {
accessField( process.getClass, "pid" ).getInt( process )
} else if ( clsName == "java.lang.Win32Process" || clsName == "java.lang.ProcessImpl" ) {
import com.sun.jna.platform.win32.{ WinNT, Kernel32 }
val handle = new WinNT.HANDLE
handle.setPointer(
com.sun.jna.Pointer.createConstant(
accessField( process.getClass, "handle" ).getLong( process )
)
)
Kernel32.INSTANCE.GetProcessId( handle )
} else {
throw new Exception( "Unexpected Process sub-class: " + clsName )
}
}

def asyncPipeCharacterStreamSyncLines( inputStream: InputStream, outputStream: OutputStream, lock: AnyRef ): Thread = {
new Thread(
new Runnable {
def run = {
val b = new BufferedInputStream( inputStream )
Iterator.continually {
b.read // block until and read next character
}.takeWhile( _ != -1 ).map { c =>
lock.synchronized { // synchronize with other invocations
outputStream.write( c )
Iterator
.continually( b.read )
.takeWhile( _ != -1 )
.map { c =>
try {
outputStream.write( c )
outputStream.flush
(
c != '\n' // release lock when new line was encountered, allowing other writers to slip in
&& b.available > 0 // also release when nothing is available to not block other outputs
)
} catch {
case e: IOException if e.getMessage == "Stream closed" => false
}
}
.takeWhile( identity )
.length // force entire iterator
}
}.length // force entire iterator
}
}
)
}

def asyncPipeCharacterStream( inputStream: InputStream, outputStream: OutputStream, continue: => Boolean ) = {
new Thread(
new Runnable {
def run = {
Iterator
.continually { inputStream.read }
.takeWhile( _ != -1 )
.map { c =>
try {
outputStream.write( c )
outputStream.flush
true
} catch {
case e: IOException if e.getMessage == "Stream closed" => false
}
}
.takeWhile( identity )
.takeWhile( _ => continue )
.length // force entire iterator
}
}
)
}
}
7 changes: 7 additions & 0 deletions libraries/process/test/test.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
object Tests{
def main(args: Array[String]): Unit = {
val pb = new ProcessBuilder("cat")
val p = pb.start
cbt.process.getProcessId( p ) // checks that it actually gets a process id
}
}
1 change: 1 addition & 0 deletions stage1/Stage1.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ object Stage1{
stage2.listFiles
++ (stage2 / "plugins").listOrFail
++ (cbtHome / "libraries" / "eval").listOrFail
++ (cbtHome / "libraries" / "process").listOrFail
).filter(_.isFile).filter(_.toString.endsWith(".scala"))

val cls = this.getClass.getClassLoader.loadClass("cbt.NailgunLauncher")
Expand Down
93 changes: 0 additions & 93 deletions stage1/Stage1Lib.scala
Original file line number Diff line number Diff line change
Expand Up @@ -432,99 +432,6 @@ ${sourceFiles.sorted.mkString(" \\\n")}
outputLastModified
)
}

def asyncPipeCharacterStreamSyncLines( inputStream: InputStream, outputStream: OutputStream, lock: AnyRef ): Thread = {
new Thread(
new Runnable{
def run = {
val b = new BufferedInputStream( inputStream )
Iterator.continually{
b.read // block until and read next character
}.takeWhile(_ != -1).map{ c =>
lock.synchronized{ // synchronize with other invocations
outputStream.write(c)
Iterator
.continually( b.read )
.takeWhile( _ != -1 )
.map{ c =>
try{
outputStream.write(c)
outputStream.flush
(
c != '\n' // release lock when new line was encountered, allowing other writers to slip in
&& b.available > 0 // also release when nothing is available to not block other outputs
)
} catch {
case e: IOException if e.getMessage == "Stream closed" => false
}
}
.takeWhile(identity)
.length // force entire iterator
}
}.length // force entire iterator
}
}
)
}

def asyncPipeCharacterStream( inputStream: InputStream, outputStream: OutputStream, continue: => Boolean ) = {
new Thread(
new Runnable{
def run = {
Iterator
.continually{ inputStream.read }
.takeWhile(_ != -1)
.map{ c =>
try{
outputStream.write(c)
outputStream.flush
true
} catch {
case e: IOException if e.getMessage == "Stream closed" => false
}
}
.takeWhile( identity )
.takeWhile( _ => continue )
.length // force entire iterator
}
}
)
}

def runWithIO( commandLine: Seq[String], directory: Option[File] = None ): ExitCode = {
val (out,err,in) = lib.getOutErrIn match { case (l,r, in) => (l.get,r.get, in) }
val pb = new ProcessBuilder( commandLine: _* )
val exitCode =
if( !NailgunLauncher.runningViaNailgun ){
pb.inheritIO.start.waitFor
} else {
val process = directory.map( pb.directory( _ ) ).getOrElse( pb )
.redirectInput(ProcessBuilder.Redirect.PIPE)
.redirectOutput(ProcessBuilder.Redirect.PIPE)
.redirectError(ProcessBuilder.Redirect.PIPE)
.start

val lock = new AnyRef

val t1 = lib.asyncPipeCharacterStreamSyncLines( process.getErrorStream, err, lock )
val t2 = lib.asyncPipeCharacterStreamSyncLines( process.getInputStream, out, lock )
val t3 = lib.asyncPipeCharacterStream( System.in, process.getOutputStream, process.isAlive )

t1.start
t2.start
t3.start

t1.join
t2.join

val e = process.waitFor
System.err.println( scala.Console.RESET + "Please press ENTER to continue..." )
t3.join
e
}

ExitCode( exitCode )
}
}

import scala.reflect._
Expand Down
26 changes: 9 additions & 17 deletions stage1/resolver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,16 @@ trait DependencyImplementation extends Dependency{
)
}
*/
def fork = false

def runMain( className: String, args: Seq[String] ): ExitCode = {
if(fork){
val java_exe = new File(System.getProperty("java.home")) / "bin" / "java"
lib.runWithIO(
java_exe.string +: "-cp" +: classpath.string +: className +: args
)
} else {
lib.getMain( classLoader.loadClass( className ) )( args )
}
}
def runMain( className: String, args: Seq[String] ): ExitCode =
lib.getMain( classLoader.loadClass( className ) )( args )

def runMain( args: Seq[String] ): ExitCode = {
val c = mainClass.getOrElse(
throw new RuntimeException( "No main class found in " + this )
)
runMain( c.getName, args )
}
def runMain( args: Seq[String] ): ExitCode =
runMain( mainClassOrFail.getName, args )

def mainClassOrFail = mainClass.getOrElse(
throw new RuntimeException( "No main class found in " + this )
)

def mainClass = lib.pickOne(
"Which one do you want to run?",
Expand Down Expand Up @@ -209,6 +200,7 @@ case class CbtDependencies(cbtLastModified: Long, mavenCache: File, nailgunTarge
stage1Dependency +:
MavenResolver(cbtLastModified, mavenCache,mavenCentral).bind(
MavenDependency("org.eclipse.jgit", "org.eclipse.jgit", "4.2.0.201601211800-r"),
MavenDependency("net.java.dev.jna", "jna-platform", "4.4.0"),
MavenDependency("org.scala-lang","scala-compiler",constants.scalaVersion)
)
)
Expand Down
28 changes: 28 additions & 0 deletions stage2/BasicBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -319,4 +319,32 @@ trait BaseBuild extends BuildInterface with DependencyImplementation with SbtDep
final def crossScalaVersionsArray = Array(scalaVersion)

def publish: Seq[URL] = Seq()

def fork = false

def runForked: ExitCode = {
val ( pid, waitFor, destroy ) = runForkedHandles
waitFor()
}

protected def runForkedHandles = runForked( mainClassOrFail.getName, context.args )

def runForked( className: String, args: Seq[String] ): ( Int, () => ExitCode, () => ExitCode ) =
lib.runMainForked(
className,
args,
classpath.string,
Some( context.workingDirectory ),
NailgunLauncher.runningViaNailgun.option(
lib.getOutErrIn match { case (l,r, in) => (l.get,r.get, in) }
)
)

override def runMain( className: String, args: Seq[String] ): ExitCode = {
if(fork){
runForked(className, args)._2()
} else {
super.runMain( className, args )
}
}
}
Loading

0 comments on commit b5194aa

Please sign in to comment.