Posted to tcl by aspect at Sat Apr 16 04:00:34 GMT 2016view raw

  1. #! /usr/bin/env tclsh
  2.  
  3. package require Tcl 8.6
  4. package require Thread
  5.  
  6. namespace eval ::pipethread {}
  7.  
  8. #namespace eval ::pipechan {}
  9. #proc pipechan {downThread thread upThread command args} {
  10. # puts stderr "pipechan: \[$downThread->$thread->$upThread\] $command { [string map [list "\n" "\\n"] $args] }"
  11. #
  12. # switch -- $command {
  13. # "initialize" {
  14. # set chan [lindex $args 0]
  15. # set mode [lindex $args 1]
  16. #
  17. # set ::pipechan::_data($thread) ""
  18. #
  19. # return [list initialize finalize watch read write blocking]
  20. # }
  21. # "finalize" {
  22. # set ::pipechan::_dataEof($upThread) 1
  23. #
  24. # return
  25. # }
  26. # "watch" {
  27. # return -code error "Not implemented"
  28. # }
  29. # "read" {
  30. # set chan [lindex $args 0]
  31. # set bytes [lindex $args 1]
  32. #
  33. # if {[info exists ::pipechan::_dataEof($thread)]} {
  34. # return ""
  35. # }
  36. #
  37. # set data [string range $::pipechan::_data($thread) 0 $bytes-1]
  38. # set ::pipechan::_data($thread) [string range $::pipechan::_data($thread) $bytes end]
  39. #
  40. # if {[string length $data] == 0} {
  41. #puts stderr "pipechan: \[$downThread->$thread->$upThread\] READ EAGAIN"
  42. # return -code error EAGAIN
  43. # }
  44. #
  45. # return $data
  46. # }
  47. # "write" {
  48. # set chan [lindex $args 0]
  49. # set data [lindex $args 1]
  50. #
  51. # append ::pipechan::_data($upThread) $data
  52. #
  53. # return
  54. # }
  55. # default {
  56. # return -code error "Unsupported action"
  57. # }
  58. # }
  59. #}
  60.  
  61. proc ::pipethread::_cloneinterp {thread} {
  62. foreach var [info global] {
  63. if {[array exists ::$var]} {
  64. thread::send $thread [list array set $var [array get ::$var]]
  65. } else {
  66. thread::send $thread [list set $var [set ::$var]]
  67. }
  68. }
  69.  
  70. foreach package [package names] {
  71. set present 0
  72. catch {
  73. package present $package
  74. set present 1
  75. }
  76.  
  77. if {!$present} {
  78. continue
  79. }
  80.  
  81. thread::send $thread [list package require $package]
  82. }
  83.  
  84. foreach proc [info proc ::*] {
  85. set procArgs [list]
  86. foreach arg [info args $proc] {
  87. if {[info default $proc $arg argDefault]} {
  88. lappend procArgs [list $arg $argDefault]
  89. } else {
  90. lappend procArgs $arg
  91. }
  92. }
  93.  
  94. thread::send $thread [list proc $proc $procArgs [info body $proc]]
  95. }
  96. }
  97.  
  98. proc ::pipethread::debugLog err { return;puts stderr "$err" }
  99.  
  100. proc ::pipethread::accumulate {fd var} {
  101. append $var [read $fd]
  102. }
  103.  
  104. proc ::pipethread::pipe args {
  105. if {![info exists ::pipethread::handles]} {
  106. set ::pipethread::handles -1
  107. }
  108.  
  109. incr ::pipethread::handles
  110.  
  111. set handle $::pipethread::handles
  112.  
  113. set cloneInterpCommand ::pipethread::_cloneinterp
  114. if {[lindex $args 0] eq "-cloneInterpCommand"} {
  115. set cloneInterpCommand [lindex $args 1]
  116.  
  117. set args [lrange $args 2 end]
  118. }
  119.  
  120. set toRun [list]
  121. for {set idx 0} {$idx < [llength $args]} {incr idx} {
  122. set start $idx
  123.  
  124. set end [lsearch -start $start -exact $args "|"]
  125.  
  126. if {$end == -1} {
  127. set end [llength $args]
  128. }
  129.  
  130. set command [lrange $args $start $end-1]
  131.  
  132. if {[llength $command] == 1 && [string index $command 0] == "\{" && [string index $command end] == "\}"} {
  133. set anonymous true
  134.  
  135. set command [lindex $command 0]
  136. } else {
  137. set anonymous false
  138. }
  139.  
  140. set pipe [chan pipe]
  141.  
  142. set pipeReader [lindex $pipe 0]
  143. set pipeWriter [lindex $pipe 1]
  144.  
  145. set thread [thread::create]
  146.  
  147. lappend toRun [list thread $thread anonymous $anonymous command $command pipeReader $pipeReader pipeWriter $pipeWriter]
  148.  
  149. set idx $end
  150. }
  151.  
  152. set finalThreadReader $pipeReader
  153.  
  154. fileevent $pipeReader readable [list ::pipethread::accumulate $finalThreadReader ::pipethread::returnValue_$handle]
  155.  
  156. for {set idx 0} {$idx < [llength $toRun]} {incr idx} {
  157. set info [lindex $toRun $idx]
  158.  
  159. set downThread ""
  160. set upThread ""
  161. catch {
  162. set downThread [dict get [lindex $toRun $idx-1] thread]
  163. }
  164. catch {
  165. set upThread [dict get [lindex $toRun $idx+1] thread]
  166. }
  167.  
  168. set inchan ""
  169. catch {
  170. set inchan [dict get [lindex $toRun $idx-1] pipeReader]
  171. }
  172. set outchan [dict get $info pipeWriter]
  173.  
  174. set thread [dict get $info thread]
  175. set anonymous [dict get $info anonymous]
  176. set command [dict get $info command]
  177.  
  178. if {$inchan ne ""} {
  179. thread::transfer $thread $inchan
  180. }
  181.  
  182. if {$outchan ne ""} {
  183. thread::transfer $thread $outchan
  184. }
  185.  
  186. if {$cloneInterpCommand ne ""} {
  187. $cloneInterpCommand $thread
  188. }
  189.  
  190. set iochan [thread::send $thread [list apply {{inchan outchan} {
  191.  
  192. if {$inchan eq ""} {return $outchan}
  193. if {$outchan eq ""} {return $inchan}
  194.  
  195. namespace eval tie {
  196. proc initialize {in out x mode} {
  197. info procs
  198. }
  199. proc finalize {in out x} { }
  200. proc write {in out x data} {
  201. puts -nonewline $out $data
  202. string length $data
  203. }
  204. proc read {in out x len} {
  205. ::read $in $len
  206. }
  207. proc watch {x eventspec} { puts "WATCH: $x $eventspec" }
  208. namespace export *
  209. namespace ensemble create -parameters {in out}
  210. }
  211. set it [chan create {r w} [list tie $inchan $outchan]]
  212. chan configure $it -blocking 1 -buffering none
  213. return $it
  214.  
  215. }} $inchan $outchan]]
  216.  
  217. if {!$anonymous} {
  218. lappend command $iochan
  219. } else {
  220. set command "set iochan [list $iochan]\n$command"
  221. }
  222.  
  223. set threadToChan($thread) [list $iochan]
  224.  
  225. debugLog "\[$thread\]: -EXEC-> [string map [list "\n" "\\n"] $command]"
  226. thread::send -async $thread $command ::pipethread::resultArray_${handle}($thread)
  227. }
  228.  
  229. for {set idx 0} {$idx < [llength $toRun]} {incr idx} {
  230. debugLog "\[---\]: WAITING"
  231. vwait ::pipethread::resultArray_$handle
  232.  
  233. set thread [lindex [array names ::pipethread::resultArray_$handle] 0]
  234. unset ::pipethread::resultArray_${handle}($thread)
  235.  
  236. debugLog "\[$thread\]: <-EXIT-"
  237.  
  238. thread::send -async $thread [list foreach chan $threadToChan($thread) { if {$chan ne ""} { close $chan } }]
  239.  
  240. debugLog "\[$thread\]: <-RELEASE-"
  241.  
  242. thread::release -wait $thread
  243.  
  244. debugLog "\[$thread\]: <-DONE-"
  245. }
  246.  
  247. ::pipethread::accumulate $finalThreadReader ::pipethread::returnValue_$handle
  248.  
  249. if {[info exists pipethread::returnValue_$handle]} {
  250. return [set ::pipethread::returnValue_$handle]
  251. }
  252.  
  253. return
  254. }
  255.  
  256. proc a {iochan} {
  257. puts "A: STARTING"
  258. puts $iochan a
  259. puts $iochan b
  260. puts $iochan what
  261. puts $iochan whatwhat
  262. puts $iochan c
  263. puts "A: ENDING"
  264. }
  265.  
  266. proc b {arg iochan} {
  267. puts "B: STARTING"
  268. while true {
  269. gets $iochan line
  270. if {[eof $iochan] && $line == ""} {
  271. break
  272. }
  273.  
  274. if {![string match "$arg*" $line]} {
  275. continue
  276. }
  277.  
  278. puts $iochan $line
  279. puts "B: SENDING: $line"
  280. }
  281. puts "B: ENDING"
  282. }
  283.  
  284. proc c {iochan} {
  285. puts "C: STARTING"
  286. gets $iochan line
  287. puts "Got: $line"
  288. puts "C: ENDING"
  289. }
  290.  
  291. set what what
  292.  
  293. puts [pipethread::pipe { puts $iochan ok }]
  294.  
  295. ::pipethread::pipe a | b $what | c
  296. puts "---"
  297. ::pipethread::pipe a | { puts $iochan ok } | c
  298. puts [pipethread::pipe { puts $iochan "a"; puts $iochan "b"; puts $iochan "c" } | { foreach item [lreverse [read $iochan]] { puts $iochan $item.1 } }]
  299.  
  300. # ---
  301. # Output:
  302. # A: STARTING
  303. # A: ENDING
  304. # B: STARTING
  305. # C: STARTING
  306. # B: SENDING: what
  307. # B: SENDING: whatwhat
  308. # B: ENDING
  309. # Got: what
  310. # C: ENDING
  311. # ---
  312. # A: STARTING
  313. # A: ENDING
  314. # C: STARTING
  315. # Got: ok
  316. # C: ENDING
  317.  
  318.