Posted to tcl by aspect at Sat Apr 16 04:00:34 GMT 2016view raw
- #! /usr/bin/env tclsh
- package require Tcl 8.6
- package require Thread
- namespace eval ::pipethread {}
- #namespace eval ::pipechan {}
- #proc pipechan {downThread thread upThread command args} {
- # puts stderr "pipechan: \[$downThread->$thread->$upThread\] $command { [string map [list "\n" "\\n"] $args] }"
- #
- # switch -- $command {
- # "initialize" {
- # set chan [lindex $args 0]
- # set mode [lindex $args 1]
- #
- # set ::pipechan::_data($thread) ""
- #
- # return [list initialize finalize watch read write blocking]
- # }
- # "finalize" {
- # set ::pipechan::_dataEof($upThread) 1
- #
- # return
- # }
- # "watch" {
- # return -code error "Not implemented"
- # }
- # "read" {
- # set chan [lindex $args 0]
- # set bytes [lindex $args 1]
- #
- # if {[info exists ::pipechan::_dataEof($thread)]} {
- # return ""
- # }
- #
- # set data [string range $::pipechan::_data($thread) 0 $bytes-1]
- # set ::pipechan::_data($thread) [string range $::pipechan::_data($thread) $bytes end]
- #
- # if {[string length $data] == 0} {
- #puts stderr "pipechan: \[$downThread->$thread->$upThread\] READ EAGAIN"
- # return -code error EAGAIN
- # }
- #
- # return $data
- # }
- # "write" {
- # set chan [lindex $args 0]
- # set data [lindex $args 1]
- #
- # append ::pipechan::_data($upThread) $data
- #
- # return
- # }
- # default {
- # return -code error "Unsupported action"
- # }
- # }
- #}
- proc ::pipethread::_cloneinterp {thread} {
- foreach var [info global] {
- if {[array exists ::$var]} {
- thread::send $thread [list array set $var [array get ::$var]]
- } else {
- thread::send $thread [list set $var [set ::$var]]
- }
- }
- foreach package [package names] {
- set present 0
- catch {
- package present $package
- set present 1
- }
- if {!$present} {
- continue
- }
- thread::send $thread [list package require $package]
- }
- foreach proc [info proc ::*] {
- set procArgs [list]
- foreach arg [info args $proc] {
- if {[info default $proc $arg argDefault]} {
- lappend procArgs [list $arg $argDefault]
- } else {
- lappend procArgs $arg
- }
- }
- thread::send $thread [list proc $proc $procArgs [info body $proc]]
- }
- }
- proc ::pipethread::debugLog err { return;puts stderr "$err" }
- proc ::pipethread::accumulate {fd var} {
- append $var [read $fd]
- }
- proc ::pipethread::pipe args {
- if {![info exists ::pipethread::handles]} {
- set ::pipethread::handles -1
- }
- incr ::pipethread::handles
- set handle $::pipethread::handles
- set cloneInterpCommand ::pipethread::_cloneinterp
- if {[lindex $args 0] eq "-cloneInterpCommand"} {
- set cloneInterpCommand [lindex $args 1]
- set args [lrange $args 2 end]
- }
- set toRun [list]
- for {set idx 0} {$idx < [llength $args]} {incr idx} {
- set start $idx
- set end [lsearch -start $start -exact $args "|"]
- if {$end == -1} {
- set end [llength $args]
- }
- set command [lrange $args $start $end-1]
- if {[llength $command] == 1 && [string index $command 0] == "\{" && [string index $command end] == "\}"} {
- set anonymous true
- set command [lindex $command 0]
- } else {
- set anonymous false
- }
- set pipe [chan pipe]
- set pipeReader [lindex $pipe 0]
- set pipeWriter [lindex $pipe 1]
- set thread [thread::create]
- lappend toRun [list thread $thread anonymous $anonymous command $command pipeReader $pipeReader pipeWriter $pipeWriter]
- set idx $end
- }
- set finalThreadReader $pipeReader
- fileevent $pipeReader readable [list ::pipethread::accumulate $finalThreadReader ::pipethread::returnValue_$handle]
- for {set idx 0} {$idx < [llength $toRun]} {incr idx} {
- set info [lindex $toRun $idx]
- set downThread ""
- set upThread ""
- catch {
- set downThread [dict get [lindex $toRun $idx-1] thread]
- }
- catch {
- set upThread [dict get [lindex $toRun $idx+1] thread]
- }
- set inchan ""
- catch {
- set inchan [dict get [lindex $toRun $idx-1] pipeReader]
- }
- set outchan [dict get $info pipeWriter]
- set thread [dict get $info thread]
- set anonymous [dict get $info anonymous]
- set command [dict get $info command]
- if {$inchan ne ""} {
- thread::transfer $thread $inchan
- }
- if {$outchan ne ""} {
- thread::transfer $thread $outchan
- }
- if {$cloneInterpCommand ne ""} {
- $cloneInterpCommand $thread
- }
- set iochan [thread::send $thread [list apply {{inchan outchan} {
- if {$inchan eq ""} {return $outchan}
- if {$outchan eq ""} {return $inchan}
- namespace eval tie {
- proc initialize {in out x mode} {
- info procs
- }
- proc finalize {in out x} { }
- proc write {in out x data} {
- puts -nonewline $out $data
- string length $data
- }
- proc read {in out x len} {
- ::read $in $len
- }
- proc watch {x eventspec} { puts "WATCH: $x $eventspec" }
- namespace export *
- namespace ensemble create -parameters {in out}
- }
- set it [chan create {r w} [list tie $inchan $outchan]]
- chan configure $it -blocking 1 -buffering none
- return $it
- }} $inchan $outchan]]
- if {!$anonymous} {
- lappend command $iochan
- } else {
- set command "set iochan [list $iochan]\n$command"
- }
- set threadToChan($thread) [list $iochan]
- debugLog "\[$thread\]: -EXEC-> [string map [list "\n" "\\n"] $command]"
- thread::send -async $thread $command ::pipethread::resultArray_${handle}($thread)
- }
- for {set idx 0} {$idx < [llength $toRun]} {incr idx} {
- debugLog "\[---\]: WAITING"
- vwait ::pipethread::resultArray_$handle
- set thread [lindex [array names ::pipethread::resultArray_$handle] 0]
- unset ::pipethread::resultArray_${handle}($thread)
- debugLog "\[$thread\]: <-EXIT-"
- thread::send -async $thread [list foreach chan $threadToChan($thread) { if {$chan ne ""} { close $chan } }]
- debugLog "\[$thread\]: <-RELEASE-"
- thread::release -wait $thread
- debugLog "\[$thread\]: <-DONE-"
- }
- ::pipethread::accumulate $finalThreadReader ::pipethread::returnValue_$handle
- if {[info exists pipethread::returnValue_$handle]} {
- return [set ::pipethread::returnValue_$handle]
- }
- return
- }
- proc a {iochan} {
- puts "A: STARTING"
- puts $iochan a
- puts $iochan b
- puts $iochan what
- puts $iochan whatwhat
- puts $iochan c
- puts "A: ENDING"
- }
- proc b {arg iochan} {
- puts "B: STARTING"
- while true {
- gets $iochan line
- if {[eof $iochan] && $line == ""} {
- break
- }
- if {![string match "$arg*" $line]} {
- continue
- }
- puts $iochan $line
- puts "B: SENDING: $line"
- }
- puts "B: ENDING"
- }
- proc c {iochan} {
- puts "C: STARTING"
- gets $iochan line
- puts "Got: $line"
- puts "C: ENDING"
- }
- set what what
- puts [pipethread::pipe { puts $iochan ok }]
- ::pipethread::pipe a | b $what | c
- puts "---"
- ::pipethread::pipe a | { puts $iochan ok } | c
- puts [pipethread::pipe { puts $iochan "a"; puts $iochan "b"; puts $iochan "c" } | { foreach item [lreverse [read $iochan]] { puts $iochan $item.1 } }]
- # ---
- # Output:
- # A: STARTING
- # A: ENDING
- # B: STARTING
- # C: STARTING
- # B: SENDING: what
- # B: SENDING: whatwhat
- # B: ENDING
- # Got: what
- # C: ENDING
- # ---
- # A: STARTING
- # A: ENDING
- # C: STARTING
- # Got: ok
- # C: ENDING