Posted to tcl by aspect at Sat Apr 16 04:00:34 GMT 2016view pretty

#! /usr/bin/env tclsh

package require Tcl 8.6
package require Thread

namespace eval ::pipethread {}

#namespace eval ::pipechan {}
#proc pipechan {downThread thread upThread command args} {
#   puts stderr "pipechan: \[$downThread->$thread->$upThread\] $command { [string map [list "\n" "\\n"] $args] }"
#
#   switch -- $command {
#       "initialize" {
#           set chan [lindex $args 0]
#           set mode [lindex $args 1]
#
#           set ::pipechan::_data($thread) ""
#
#           return [list initialize finalize watch read write blocking]
#       }
#       "finalize" {
#           set ::pipechan::_dataEof($upThread) 1
#
#           return
#       }
#       "watch" {
#           return -code error "Not implemented"
#       }
#       "read" {
#           set chan [lindex $args 0]
#           set bytes [lindex $args 1]
#
#           if {[info exists ::pipechan::_dataEof($thread)]} {
#               return ""
#           }
#
#           set data [string range $::pipechan::_data($thread) 0 $bytes-1]
#           set ::pipechan::_data($thread) [string range $::pipechan::_data($thread) $bytes end]
#
#           if {[string length $data] == 0} {
#puts stderr "pipechan: \[$downThread->$thread->$upThread\] READ EAGAIN"
#               return -code error EAGAIN
#           }
#
#           return $data
#       }
#       "write" {
#           set chan [lindex $args 0]
#           set data [lindex $args 1]
#
#           append ::pipechan::_data($upThread) $data
#
#           return
#       }
#       default {
#           return -code error "Unsupported action"
#       }
#   }
#}

proc ::pipethread::_cloneinterp {thread} {
    foreach var [info global] {
        if {[array exists ::$var]} {
            thread::send $thread [list array set $var [array get ::$var]]
        } else {
            thread::send $thread [list set $var [set ::$var]]
        }
    }

    foreach package [package names] {
        set present 0
        catch {
            package present $package
            set present 1
        }

        if {!$present} {
            continue
        }

        thread::send $thread [list package require $package]
    }

    foreach proc [info proc ::*] {
        set procArgs [list]
        foreach arg [info args $proc] {
            if {[info default $proc $arg argDefault]} {
                lappend procArgs [list $arg $argDefault]
            } else {
                lappend procArgs $arg
            }
        }

        thread::send $thread [list proc $proc $procArgs [info body $proc]]
    }
}

proc ::pipethread::debugLog err { return;puts stderr "$err" }

proc ::pipethread::accumulate {fd var} {
    append $var [read $fd]
}

proc ::pipethread::pipe args {
    if {![info exists ::pipethread::handles]} {
        set ::pipethread::handles -1
    }

    incr ::pipethread::handles

    set handle $::pipethread::handles

    set cloneInterpCommand ::pipethread::_cloneinterp
    if {[lindex $args 0] eq "-cloneInterpCommand"} {
        set cloneInterpCommand [lindex $args 1]

        set args [lrange $args 2 end]
    }

    set toRun [list]
    for {set idx 0} {$idx < [llength $args]} {incr idx} {
        set start $idx

        set end [lsearch -start $start -exact $args "|"]

        if {$end == -1} {
            set end [llength $args]
        }

        set command [lrange $args $start $end-1]

        if {[llength $command] == 1 && [string index $command 0] == "\{" && [string index $command end] == "\}"} {
            set anonymous true

            set command [lindex $command 0]
        } else {
            set anonymous false
        }

        set pipe [chan pipe]

        set pipeReader [lindex $pipe 0]
        set pipeWriter [lindex $pipe 1]

        set thread [thread::create]

        lappend toRun [list thread $thread anonymous $anonymous command $command pipeReader $pipeReader pipeWriter $pipeWriter]

        set idx $end
    }

    set finalThreadReader $pipeReader

    fileevent $pipeReader readable [list ::pipethread::accumulate $finalThreadReader ::pipethread::returnValue_$handle]

    for {set idx 0} {$idx < [llength $toRun]} {incr idx} {
        set info [lindex $toRun $idx]

        set downThread ""
        set upThread ""
        catch {
            set downThread [dict get [lindex $toRun $idx-1] thread]
        }
        catch {
            set upThread [dict get [lindex $toRun $idx+1] thread]
        }

        set inchan ""
        catch {
            set inchan [dict get [lindex $toRun $idx-1] pipeReader]
        }
        set outchan [dict get $info pipeWriter]

        set thread [dict get $info thread]
        set anonymous [dict get $info anonymous]
        set command [dict get $info command]

        if {$inchan ne ""} {
            thread::transfer $thread $inchan
        }

        if {$outchan ne ""} {
            thread::transfer $thread $outchan
        }

        if {$cloneInterpCommand ne ""} {
            $cloneInterpCommand $thread
        }

        set iochan [thread::send $thread [list apply {{inchan outchan} {

                if {$inchan eq ""} {return $outchan}
                if {$outchan eq ""} {return $inchan}

                namespace eval tie {
                    proc initialize {in out x mode}    {
                        info procs
                    }
                    proc finalize {in out x}           { }
                    proc write {in out x data}         {
                        puts -nonewline $out $data
                        string length $data
                    }
                    proc read {in out x len}           {
                        ::read $in $len
                    }
                    proc watch {x eventspec}    { puts "WATCH: $x $eventspec" }
                    namespace export *
                    namespace ensemble create -parameters {in out}
                }
                set it [chan create {r w} [list tie $inchan $outchan]]
                chan configure $it -blocking 1 -buffering none
                return $it

        }} $inchan $outchan]]

        if {!$anonymous} {
            lappend command $iochan
        } else {
            set command "set iochan [list $iochan]\n$command"
        }

        set threadToChan($thread) [list $iochan]

        debugLog "\[$thread\]: -EXEC-> [string map [list "\n" "\\n"] $command]"
        thread::send -async $thread $command ::pipethread::resultArray_${handle}($thread)
    }

    for {set idx 0} {$idx < [llength $toRun]} {incr idx} {
        debugLog "\[---\]: WAITING"
        vwait ::pipethread::resultArray_$handle

        set thread [lindex [array names ::pipethread::resultArray_$handle] 0]
        unset ::pipethread::resultArray_${handle}($thread)

        debugLog "\[$thread\]: <-EXIT-"

        thread::send -async $thread [list foreach chan $threadToChan($thread) { if {$chan ne ""} { close $chan } }]

        debugLog "\[$thread\]: <-RELEASE-"

        thread::release -wait $thread

        debugLog "\[$thread\]: <-DONE-"
    }

    ::pipethread::accumulate $finalThreadReader ::pipethread::returnValue_$handle

    if {[info exists pipethread::returnValue_$handle]} {
        return [set ::pipethread::returnValue_$handle]
    }

    return
}

proc a {iochan} {
    puts "A: STARTING"
    puts $iochan a
    puts $iochan b
    puts $iochan what
    puts $iochan whatwhat
    puts $iochan c
    puts "A: ENDING"
}

proc b {arg iochan} {
    puts "B: STARTING"
    while true {
        gets $iochan line
        if {[eof $iochan] && $line == ""} {
            break
        }

        if {![string match "$arg*" $line]} {
            continue
        }

        puts $iochan $line
        puts "B: SENDING: $line"
    }
    puts "B: ENDING"
}

proc c {iochan} {
    puts "C: STARTING"
    gets $iochan line
    puts "Got: $line"
    puts "C: ENDING"
}

set what what

puts [pipethread::pipe { puts $iochan ok }]

::pipethread::pipe a | b $what | c
puts "---"
::pipethread::pipe a | { puts $iochan ok } | c
puts [pipethread::pipe { puts $iochan "a"; puts $iochan "b"; puts $iochan "c" } | { foreach item [lreverse [read $iochan]] { puts $iochan $item.1 } }]

# ---
# Output:
# A: STARTING
# A: ENDING
# B: STARTING
# C: STARTING
# B: SENDING: what
# B: SENDING: whatwhat
# B: ENDING
# Got: what
# C: ENDING
# ---
# A: STARTING
# A: ENDING
# C: STARTING
# Got: ok
# C: ENDING