[QUIZ] Process Rings (#135)

R

Ruby Quiz

The three rules of Ruby Quiz:

1. Please do not post any solutions or spoiler discussion for this quiz until
48 hours have passed from the time on this message.

2. Support Ruby Quiz by submitting ideas as often as you can:

http://www.rubyquiz.com/

3. Enjoy!

Suggestion: A [QUIZ] in the subject of emails about the problem helps everyone
on Ruby Talk follow the discussion. Please reply to the original quiz message,
if you can.

-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

I recently wrote about a challenge in the Programming Erlang book on my blog:

http://blog.grayproductions.net/articles/2007/08/13/erlang-message-passing

Language comparison issues aside, just figuring out how to build a ring of
"processes" was quite the brain bender for me. That always makes for good Ruby
Quiz material, in my opinion.

The task is straight forward:

1. Your program should take two command-line arguments: a number of
processes and a number of cycles.
2. Begin by creating the requested number of processes, in a ring.
For example, when three processes are requested, process one
creates and sends messages to process two, which creates and sends
messages to process three. The third process then sends its
messages back to process one.
3. Pass a message around your ring of processes a number of times
equal to the requested cycles. Print timing results for how
long this takes.

The message you pass doesn't much matter. A simple String is fine. You may
also wish to pass a counter with it, to verify the correct number of sends.

I'll leave the definition of "processes" intentionally vague. Ruby doesn't have
an equivalent to Erlang processes so we will just say that each process should
represent a node where we could run some instructions concurrently. Be
creative.
 
T

Tom Danielsen

Ok.... this is my first attempt at using callcc in ruby. The code
is not what I would call beautiful, but should be readable...(??).
Suggestions are most welcome!


#!/usr/bin/env ruby
# vim:et:ts=4:sw=4

$n = 0 if $DEBUG

class RLWP
def initalize
@nxt = nil
end
attr_accessor :nxt
def makecont
passesleft, message = callcc { |cont|
return cont
}
if passesleft <= 0
puts $n if $DEBUG
exit 0
end
$n += 1 if $DEBUG
@nxt.call(passesleft - 1, message)
end
end

def run(n, cycles, msg)
process = Array.new(n) { RLWP.new }
cont = process.collect { |p| p.makecont }
process.each_with_index { |p,i| p.nxt = cont[(i+1) % n] }
cont[0].call(n * cycles, msg)
end

run ARGV[0].to_i, ARGV[1].to_i, "xyzzy"




tom@molly:~/src/quiz-135% time ruby -d quiz-135-callcc.rb 1000 1000
1000000
ruby -d quiz-135-callcc.rb 1000 1000 10.781 user 0.037 system 99% cpu 10.868 total
tom@molly:~/src/quiz-135%


At somewhere around 12000 processes ruby started crashing on my system,
maybe a memory issue?

tom@molly:~/src/quiz-135% time ruby quiz-135-callcc.rb 12000 3
[2] 30579 illegal hardware instruction (core dumped) ruby quiz-135-callcc.rb 12000 3
ruby quiz-135-callcc.rb 12000 3 1.695 user 0.713 system 32% cpu 7.418 total
tom@molly:~/src/quiz-135%



regards,
-tom
 
J

James Edward Gray II

Ok.... this is my first attempt at using callcc in ruby.

Neat idea.

Here's my own version using fork():

#!/usr/bin/env ruby -wKU

unless ARGV.size == 2
abort "Usage: #{File.basename($PROGRAM_NAME)} PROCESSES CYCLES"
end
processes, cycles = ARGV.map { |n| n.to_i }

parent, child = true, false
parent_reader, parent_writer = IO.pipe
reader, writer = IO.pipe
my_reader = parent_reader

puts "Creating #{processes} processes..."
processes.times do |process|
if fork
break
else
parent_reader.close unless parent_reader.closed?
writer.close

parent = false
my_reader = reader
reader, writer = IO.pipe
end
child = true if process == processes - 1
end
if child
puts "Done."
my_writer = parent_writer
else
parent_writer.close
my_writer = writer
end

if parent
puts "Timer started."
start_time = Time.now
puts "Sending a message around the ring #{cycles} times..."
cycles.times do
my_writer.puts "0 Ring message"
my_writer.flush
raise "Failure" unless my_reader.gets =~ /\A#{processes} Ring
message\Z/
end
puts "Done: success."
puts "Time in seconds: #{(Time.now - start_time).to_i}"
else
my_reader.each do |message|
if message =~ /\A(\d+)\s+(.+)/
my_writer.puts "#{$1.to_i + 1} #{$2}"
my_writer.flush
end
end
end

__END__

And here's a threaded attempt:

#!/usr/bin/env ruby -wKU

begin
require "fastthread"
puts "Using the fastthread library."
rescue LoadError
require "thread"
puts "Using the standard Ruby thread library."
end

module MRing
class Forward
def initialize(count, parent)
@child = count.zero? ? parent : Forward.new(count - 1, parent)
@queue = Queue.new

run
end

def send_message(message)
@queue.enq message
end

private

def run
Thread.new do
loop do
message = @queue.deq
if message =~ /\A(\d+)\s+(.+)/
@child.send_message "#{$1.to_i + 1} #{$2}"
end
end
end
end
end

class Parent < Forward
def initialize(processes, cycles)
@processes = processes
@cycles = cycles

puts "Creating #{processes} processes..."
super(processes, self)
end

private

def run
puts "Timer started."
start_time = Time.now
puts "Sending a message around the ring #{@cycles} times..."
@cycles.times do
@child.send_message "0 Ring message"
raise "Failure" unless @queue.deq =~ /\A#{@processes} Ring
message\Z/
end
puts "Done: success."
puts "Time in seconds: #{(Time.now - start_time).to_i}"
end
end
end

if __FILE__ == $PROGRAM_NAME
unless ARGV.size == 2
abort "Usage: #{File.basename($PROGRAM_NAME)} PROCESSES CYCLES"
end
processes, cycles = ARGV.map { |n| n.to_i }

MRing::parent.new(processes, cycles)
end

__END__

James Edward Gray II
 
T

Tom Danielsen

def initalize
@nxt = nil
end

oops, a slight copy/paste error there..... run() will call
nxt= before calling the continuations so the code still runs,
but I like to set my instance vars in the constructor anyway :)

regards,
tom
 
T

Tom Danielsen

I'll resubmit my entry,
- fixed typo "initialize"
- cleaner exit from message passing code


regards,
-tom



#!/usr/bin/env ruby
# vim:et:ts=4:sw=4

$n = 0 if $DEBUG

class RLWP
def initialize
@nxt = nil
end
attr_accessor :nxt
def makecont
passesleft, message = callcc { |cont|
return cont
}
if passesleft <= 0
puts $n if $DEBUG
throw :DONE
end
$n += 1 if $DEBUG
@nxt.call(passesleft - 1, message)
end
end

def run(n, cycles, msg)
catch:)DONE) {
process = Array.new(n) { RLWP.new }
cont = process.collect { |p| p.makecont }
process.each_with_index { |p,i| p.nxt = cont[(i+1) % n] }
cont[0].call(n * cycles, msg)
}
end

run ARGV[0].to_i, ARGV[1].to_i, "xyzzy"
 
R

Robert Dober

It is too rare that I find time to solve the Quiz, but this time I
just had to find that time to make it back, and I enjoyed it as ever
:)
Here goes my shot:


require 'labrador/enum/map'
require 'labrador/exp/open-proto'
require 'thread'

processes, cycles = ARGV.map.to_i

timer = new_proto{
def init
obj_variable :stopped, nil
obj_variable :started, Time.now
obj_variable :ellapsed, nil
end
def read
self.ellapsed ||=
stopped.tv_sec - started.tv_sec + (
stopped.tv_usec - started.tv_usec
) / 1_000_000.0
end
def reset
self.stopped = nil
self.started = Time.now
end
def stop
self.stopped = Time.now
end
}
ring_element = new_proto(Prototype::OpenProto){
define_method :init do |params|
super
obj_variable :thread, Thread.new{
cycles.times do |i|
m = lhs_queue.deq
rhs_queue.enq "thread=#{count}::count=#{i}"
end
}
end
}

startup_timer = timer.new
lqueue = Queue.new
all_processes = (2..processes).map{ |count|
ring_element.new :count => count,
:lhs_queue => lqueue,
:rhs_queue => ( lqueue = Queue.new )
}
all_processes << ring_element.new(
:count => 1,
:lhs_queue => lqueue,
:rhs_queue => all_processes.first.lhs_queue
)
startup_timer.stop
run_timer = timer.new
all_processes.last.lhs_queue.enq "Can you please start"
all_processes.map.thread.map.join
run_timer.stop
puts "Startup time for #{processes} processes: %3.6fs" % startup_timer.read
puts "Runtime for #{processes} processes and #{cycles} cycles: %3.6fs"
% run_timer.read
 
A

Adam Shelly

...
I'll leave the definition of "processes" intentionally vague. Ruby doesn't have
an equivalent to Erlang processes so we will just say that each process should
represent a node where we could run some instructions concurrently. Be
creative.
I've been thinking about virtual machines recently, so I decided to
implement one in Ruby for this quiz.

My solution has 4 parts:
- a pair of programs in an as yet unnamed language. Each program
sends a message to the next process. One program implements a counter
to stop the loop at the end.

- A Compiler for this language. To keep the compiler simple, the
parser has no look-ahead. One result of this is that all operators
have left associativity, so
'n=n+1', 'n=1+n', and 'n=(n+1)' all set n to different values, and
only the last one is unsuprising. The compiler returns an array
containing 'assembly', which is essentially bytecode, except the
codes are ruby symbols, not bytes.

- The InstructionSet, which contains a method for each VM instruction

- The virtual CPU. Schedules and runs a set of Proceses. A Process
executes assembly by sending each symbol to the instruction set.

This solution may be the slowest one submitted, but it was interesting to write.
-Adam

---BEGIN SOLUTION---
#processring.rb
# for Ruby Quiz #135
# Adam Shelly

#Implements a virtual machine and a compiler for a simple language
# language definition:
# types: ints, strings
# variables don't need to be declared in advance (works like ruby)
# only 3 keywords: 'exit', 'if' and 'while',
# the latter two take the form 'keyword (condition) { body }'.
# the parens and brackets are required
# only 2 operators: '+' and '-'.
# 4 builtin functions: '_peek' returns true if any messages waiting
for this proceses
# '_get' returns first pending message.
# '_send(id, message)' sends message to
process with given id
# '_puts(message)' writes message to stdout
# '%' before a name indicates process variable.
# process variables include: %id = current process id
# %last = value of last expression
# strictly left associative, use parentheses to group.
# be careful with assignments: 'n = 1+1' == '(n=1)+1'
# you usually want to do 'n = (1+1)'

# Here are the two programs we will execute.
# this one just forwards any message to the next process
prog1 = <<PROG
while (0) { _get }
while (1) {
if (_peek) {
msg =_get
_send ((%id+1),msg)
}
}
PROG

# This one generates a message and sends it to process 0, n times.
# It will be the last process so we can close the ring.
prog2 = <<PROG
n = _get
_send (0,"chunky bacon")
while (n ) {
if (_peek) {
msg = _get
_send (0,msg)
n = ( n - 1)
_puts ( n )
}
}
_puts ( "done" )
exit
PROG

# The Compiler turns program text into "assembly"
class Compiler
@@symbols = {}
#register keywords
%w{while if end exit}.each{|kw| @@symbols[kw]=kw.to_sym}
#register builtins
%w{_peek _get _send _puts}.each{|bi| @@symbols[bi] = :builtin}

def self.compile code
asm = []
text = code
text = text.dup #don't destroy original code
token,name = parse text
while (token)
p token if $DEBUG
case token
when :while,:if,:exit,:end,:add,:subtract,:assign,:comma
asm << token
when :localvar,:procvar,:builtin,:num,:string
asm << token
asm << name
when :startgroup,:startblock
asm << token
asm << 0 #placeholder for size of group/block
when :endgroup
startgroup = asm.rindex:)startgroup)
asm[startgroup] = :group
asm[startgroup+1] = asm.size-startgroup-2 #store groupsize
when :endblock
startblock = asm.rindex:)startblock)
asm[startblock] = :block
asm[startblock+1] = asm.size-startblock #store blocksize
asm << :endblock
asm << asm.size+1 #placeholder for looptarget (default is next inst.)
end
token,name = parse text
end
return asm
end

private
def self.parse text, vartype = :localvar
pt = 0;
p "parse: #{text}" if $DEBUG
while (true)
case (c = text[pt,1])
when '' #EOF
return nil

when /\s/ #skip whitespace
pt+=1
next

when /\d/ #integers
v = text[pt..-1].to_i
text.slice!(0..pt+v.to_s.length-1) #remove number
return :num,v

when /\w/ #identifiers
name = /\w*/.match(text[pt..-1])[0]
text.slice!(0..pt+name.length-1) #remove name
sym = @@symbols[name]
sym = register_var(name,vartype) if !sym #unknown identifier is variable
return sym,name

when '"' #strings
name = /".*?[^\\]"/m.match(text[pt..-1])[0]
text.slice!(0..pt+name.length-1) #remove name
return :string, name

when '%' #processes variables
text.slice!(0..pt)
token,name = parse text,:procvar
raise "invalid process variable" if token!= :procvar
return token,name

when '=': #punctuation
text.slice!(0..pt)
return :assign, c
when ',':
text.slice!(0..pt)
return :comma,c
when '+'
text.slice!(0..pt)
return :add,'+'
when '-'
text.slice!(0..pt)
return :subtract,'-'
when '('
text.slice!(0..pt)
return :startgroup, c
when ')'
text.slice!(0..pt)
return :endgroup, c
when '{'
text.slice!(0..pt)
return :startblock, c
when '}'
text.slice!(0..pt)
return :endblock, c
end #case
end #while
end

def self.register_var name,type
@@symbols[name] = type
end
end


#The cpu instruction set.
#each instruction is the equivalent of a VM bytecode.
class InstructionSet
def initialize cpu
@cpu = cpu
end

def exit proc #halt the cpu
@cpu.halt
end
def end proc #end the current process
@cpu.end_process proc.id
end

def while proc
loopp = proc.pc-1
test = proc.exec
blocksize = proc.exec
if test && test != 0
#if we are going to loop, store the loop start address at the
end of the block
proc.pm[proc.pc+blocksize-1] = loopp
else
proc.pc += blocksize
end
end
def if proc
test = proc.exec
blocksize = proc.exec
if !test || test == 0
proc.pc += blocksize
end
end

def block proc
blocksize = proc.pop
end
def endblock proc
jumptarg = proc.pop #after block, maybe jump somewhere
proc.pc = jumptarg
end

def group proc
groupsize = proc.pop
endgroup = proc.pc+groupsize
while (proc.pc < endgroup)
val = proc.exec
end
return val
end

def num proc
proc.pop
end
def string proc
proc.pop
end
def builtin proc
inst = proc.pop
@cpu.send(inst,proc)
end
def localvar proc
varname = proc.pop
proc.getvar varname
end
def procvar proc
varname = proc.pop
proc.send varname
end
def assign proc
proc.setvar(proc.exec)
end
def comma proc
return :comma
end
def add proc
return proc.last + proc.exec
end
def subtract proc
return proc.last - proc.exec
end

#returns elements of group as array
#used to evaluate arguments for function call
def ungroup proc
args = []
proc.pop #ignore :group
groupsize = proc.pop
endgroup = proc.pc+groupsize
while (proc.pc < endgroup)
arg = proc.exec
args << arg unless arg == :comma
end
return args
end
end


#the CPU
# acts as process scheduler
# processes run for TIMESLICE instructions, or until they send or get a message.
# in the latter case, control switches to the process with a message
pending for the longest time
class CPU
TIMESLICE = 10

# CProcess is a process on our virtual machine
# don't create directly, use CPU#add_process
class CProcess
attr_accessor :pm,:pc,:id,:last
def initialize id, code, vm
@id = id
@pm = code #program memory
@pc = 0 #program counter
@vars = {}
@curvar = nil
@vm = vm
end

#executes a VM instruction, advances program counter
def exec
inst = @pm[@pc]
p to_s if $DEBUG
@pc+=1
@last = @vm.send(inst,self)
end
def pop
@pc+=1
@pm[@pc-1]
end

def getvar name
@curvar = name
@vars[name]||=0
end
def setvar value
@vars[@curvar] = value
end

def to_s
"#{@id}@#{@pc}: #{@pm[@pc]} (#{@pm[@pc+1]})"
end
end #class Process

def initialize
@processes = []
@messages = []
@i = InstructionSet.new self
@queue=[[],[]] #scheduling queues
end

def add_process code
asm = code.dup
asm << :end
id = @processes.size
@processes << CProcess.new(id, asm,@i)
@messages[id] = []
@queue[0] << id
@cur_proc_id = id
end

#stop processes by swapping it out if it is running, and removing it
from queues.
def end_process id
taskswap 0 if @cur_proc_id == id
@processes -= [id]
@queue[0] -= [id]
@queue[1] -= [id]
end

def start
@running = true
run
end
def halt
@running = false
end

#inject a message into the system
def send_msg proc_id,msg
@messages[proc_id]<< msg
@queue[1]<<proc_id
end

private
#run the scheduler
def run
@timeslice = 0
while (@running)
@processes[@cur_proc_id].exec
@timeslice+=1
if (@timeslice > TIMESLICE)
taskswap 0
end
end
end

#switch to the next process waiting at this priority level
def taskswap priority
@cur_proc_id = @queue[priority].shift||@cur_proc_id
(@queue[priority] << @cur_proc_id) if priority == 0
@timeslice = 0
end

## built-in messaging functions
def _peek proc
@messages[proc.id][0]
end
def _get proc
retval = @messages[proc.id].shift
taskswap 1
return retval
end
def _send proc
#send puts the target process on the high priority queue
args = @i.ungroup proc
@messages[args[0]] << args[1]
@queue[1]<<args[0]
taskswap 1
args[1]
end

def _puts proc
args = @i.ungroup proc
puts args
end
end

if __FILE__ == $0
puts "usage: #{$0} processes cycles" or exit if ARGV.size < 2
processes, cycles = ARGV.map { |n| n.to_i }

puts "Timer started."
start_time = Time.now
puts "Creating #{processes} processes"

code1 = Compiler.compile prog1
code2 = Compiler.compile prog2
cpu = CPU.new
(processes-1).times { cpu.add_process code1 }
last_proc = cpu.add_process code2

puts "Sending a message around the ring #{cycles} times..."
cpu.send_msg last_proc,cycles
cpu.start
puts "Time in seconds: #{(Time.now - start_time)}"
end
 
J

James Edward Gray II

I've been thinking about virtual machines recently, so I decided to
implement one in Ruby for this quiz.

That gets my vote for the craziest Ruby Quiz solution ever. Wow!

You have no idea how much I regret that I wrote the quiz summary
earlier today. :(

James Edward Gray II
 
A

Adam Shelly

That gets my vote for the craziest Ruby Quiz solution ever. Wow!

You have no idea how much I regret that I wrote the quiz summary
earlier today. :(
You have no idea how much I regret that I had a bunch of real work to
do and didn't have time to finish it up sooner. :)

I also wish I had time and a valid reason to keep working on extending
the VM. The first thing I'd do is find a good name for the
language...

-Adam
 
T

Tom

Hey guys... here's my attempt. I'm a bit of n00b, so feedback
welcome!

I attacked this with DRb (distributed ruby), since it hadn't been
touched yet. It meant taking a serious hit in performance, but it was
at least interesting. It's threaded, but could be forked (haven't
tried that just yet).

Thanks!

---BEGIN SOLUTION---
# Num. 135
# process_rings.rb
require 'drb'

BasePort = 7654

class RingParent
def initialize(processes = 3, cycles = 5)
@processes = processes
@cycles = cycles
@message = "Message from parent\n"
end

def start
spawn_processes
connect_ring
send_messages
end

def spawn_processes
t = []
for i in 0...@processes-1
t << Thread.new do
RingMember.new(BasePort+i, BasePort+i+1, self)
end
end
t << Thread.new do
RingMember.new(BasePort+@processes-1, BasePort)
end
end

def connect_ring
DRb.start_service
@ring = DRbObject.new(nil, "druby://127.0.0.1:#{BasePort}")
end

def send_messages
@start = Time.now
@cycles.times do
@ring.parent_receive("Hi ring!")
end
end

def return_message(message)
puts "Parent: Got message back- circulation time: #{Time.now -
@start}"
end
end

class RingMember
def initialize(port, next_port, parent = nil)
@port = port
@parent = parent
@current_message = ""
@next_member = connect_next(next_port)
DRBService.new(self, @port)
end

def connect_next(port)
DRb.start_service
DRbObject.new(nil, "druby://127.0.0.1:#{port}")
end

def parent_receive(message)
@current_message = message
forward_message(@current_message)
end

def receive_message(message)
begin
message == @current_message ?
(@parent.return_message(message);(@current_message = "")) :
forward_message(message)
rescue
puts "#{@port}: Received duplicate message, couldn't talk to
parent: #{$!}"
end
end

def forward_message(message)
@next_member.receive_message(message)
end

def test(message)
return "#{@port}: Got message #{message}"
end
end

class DRBService
def initialize(process, port)
DRb.start_service("druby://:#{port}", process)
DRb.thread.join
end
end

processes = ARGV[0].to_i
cycles = ARGV[1].to_i
parent = RingParent.new(processes, cycles)
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,755
Messages
2,569,537
Members
45,020
Latest member
GenesisGai

Latest Threads

Top