Posted to tcl by aspect at Sat Apr 16 04:00:34 GMT 2016view pretty
#! /usr/bin/env tclsh package require Tcl 8.6 package require Thread namespace eval ::pipethread {} #namespace eval ::pipechan {} #proc pipechan {downThread thread upThread command args} { # puts stderr "pipechan: \[$downThread->$thread->$upThread\] $command { [string map [list "\n" "\\n"] $args] }" # # switch -- $command { # "initialize" { # set chan [lindex $args 0] # set mode [lindex $args 1] # # set ::pipechan::_data($thread) "" # # return [list initialize finalize watch read write blocking] # } # "finalize" { # set ::pipechan::_dataEof($upThread) 1 # # return # } # "watch" { # return -code error "Not implemented" # } # "read" { # set chan [lindex $args 0] # set bytes [lindex $args 1] # # if {[info exists ::pipechan::_dataEof($thread)]} { # return "" # } # # set data [string range $::pipechan::_data($thread) 0 $bytes-1] # set ::pipechan::_data($thread) [string range $::pipechan::_data($thread) $bytes end] # # if {[string length $data] == 0} { #puts stderr "pipechan: \[$downThread->$thread->$upThread\] READ EAGAIN" # return -code error EAGAIN # } # # return $data # } # "write" { # set chan [lindex $args 0] # set data [lindex $args 1] # # append ::pipechan::_data($upThread) $data # # return # } # default { # return -code error "Unsupported action" # } # } #} proc ::pipethread::_cloneinterp {thread} { foreach var [info global] { if {[array exists ::$var]} { thread::send $thread [list array set $var [array get ::$var]] } else { thread::send $thread [list set $var [set ::$var]] } } foreach package [package names] { set present 0 catch { package present $package set present 1 } if {!$present} { continue } thread::send $thread [list package require $package] } foreach proc [info proc ::*] { set procArgs [list] foreach arg [info args $proc] { if {[info default $proc $arg argDefault]} { lappend procArgs [list $arg $argDefault] } else { lappend procArgs $arg } } thread::send $thread [list proc $proc $procArgs [info body $proc]] } } proc ::pipethread::debugLog err { return;puts stderr "$err" } proc ::pipethread::accumulate {fd var} { append $var [read $fd] } proc ::pipethread::pipe args { if {![info exists ::pipethread::handles]} { set ::pipethread::handles -1 } incr ::pipethread::handles set handle $::pipethread::handles set cloneInterpCommand ::pipethread::_cloneinterp if {[lindex $args 0] eq "-cloneInterpCommand"} { set cloneInterpCommand [lindex $args 1] set args [lrange $args 2 end] } set toRun [list] for {set idx 0} {$idx < [llength $args]} {incr idx} { set start $idx set end [lsearch -start $start -exact $args "|"] if {$end == -1} { set end [llength $args] } set command [lrange $args $start $end-1] if {[llength $command] == 1 && [string index $command 0] == "\{" && [string index $command end] == "\}"} { set anonymous true set command [lindex $command 0] } else { set anonymous false } set pipe [chan pipe] set pipeReader [lindex $pipe 0] set pipeWriter [lindex $pipe 1] set thread [thread::create] lappend toRun [list thread $thread anonymous $anonymous command $command pipeReader $pipeReader pipeWriter $pipeWriter] set idx $end } set finalThreadReader $pipeReader fileevent $pipeReader readable [list ::pipethread::accumulate $finalThreadReader ::pipethread::returnValue_$handle] for {set idx 0} {$idx < [llength $toRun]} {incr idx} { set info [lindex $toRun $idx] set downThread "" set upThread "" catch { set downThread [dict get [lindex $toRun $idx-1] thread] } catch { set upThread [dict get [lindex $toRun $idx+1] thread] } set inchan "" catch { set inchan [dict get [lindex $toRun $idx-1] pipeReader] } set outchan [dict get $info pipeWriter] set thread [dict get $info thread] set anonymous [dict get $info anonymous] set command [dict get $info command] if {$inchan ne ""} { thread::transfer $thread $inchan } if {$outchan ne ""} { thread::transfer $thread $outchan } if {$cloneInterpCommand ne ""} { $cloneInterpCommand $thread } set iochan [thread::send $thread [list apply {{inchan outchan} { if {$inchan eq ""} {return $outchan} if {$outchan eq ""} {return $inchan} namespace eval tie { proc initialize {in out x mode} { info procs } proc finalize {in out x} { } proc write {in out x data} { puts -nonewline $out $data string length $data } proc read {in out x len} { ::read $in $len } proc watch {x eventspec} { puts "WATCH: $x $eventspec" } namespace export * namespace ensemble create -parameters {in out} } set it [chan create {r w} [list tie $inchan $outchan]] chan configure $it -blocking 1 -buffering none return $it }} $inchan $outchan]] if {!$anonymous} { lappend command $iochan } else { set command "set iochan [list $iochan]\n$command" } set threadToChan($thread) [list $iochan] debugLog "\[$thread\]: -EXEC-> [string map [list "\n" "\\n"] $command]" thread::send -async $thread $command ::pipethread::resultArray_${handle}($thread) } for {set idx 0} {$idx < [llength $toRun]} {incr idx} { debugLog "\[---\]: WAITING" vwait ::pipethread::resultArray_$handle set thread [lindex [array names ::pipethread::resultArray_$handle] 0] unset ::pipethread::resultArray_${handle}($thread) debugLog "\[$thread\]: <-EXIT-" thread::send -async $thread [list foreach chan $threadToChan($thread) { if {$chan ne ""} { close $chan } }] debugLog "\[$thread\]: <-RELEASE-" thread::release -wait $thread debugLog "\[$thread\]: <-DONE-" } ::pipethread::accumulate $finalThreadReader ::pipethread::returnValue_$handle if {[info exists pipethread::returnValue_$handle]} { return [set ::pipethread::returnValue_$handle] } return } proc a {iochan} { puts "A: STARTING" puts $iochan a puts $iochan b puts $iochan what puts $iochan whatwhat puts $iochan c puts "A: ENDING" } proc b {arg iochan} { puts "B: STARTING" while true { gets $iochan line if {[eof $iochan] && $line == ""} { break } if {![string match "$arg*" $line]} { continue } puts $iochan $line puts "B: SENDING: $line" } puts "B: ENDING" } proc c {iochan} { puts "C: STARTING" gets $iochan line puts "Got: $line" puts "C: ENDING" } set what what puts [pipethread::pipe { puts $iochan ok }] ::pipethread::pipe a | b $what | c puts "---" ::pipethread::pipe a | { puts $iochan ok } | c puts [pipethread::pipe { puts $iochan "a"; puts $iochan "b"; puts $iochan "c" } | { foreach item [lreverse [read $iochan]] { puts $iochan $item.1 } }] # --- # Output: # A: STARTING # A: ENDING # B: STARTING # C: STARTING # B: SENDING: what # B: SENDING: whatwhat # B: ENDING # Got: what # C: ENDING # --- # A: STARTING # A: ENDING # C: STARTING # Got: ok # C: ENDING