1 -- dblink is necessary to be able to sub-transactions (autonomous
2 -- transactions) to the stream table. This is necessary to be able to
3 -- modify the stream table from the perspective of outside callers
4 -- because actual code can be long-lived and it's direct updates will
5 -- not be seen until the process completes.
13 rl_prompt
varchar -- prompt for readline input
17 INSERT INTO io.
stream (stream_id
, open, data, rl_prompt
)
18 VALUES (0, false, '', '');
20 INSERT INTO io.
stream (stream_id
, open, data, rl_prompt
)
21 VALUES (1, false, '', '');
23 -- ---------------------------------------------------------
25 CREATE FUNCTION io.
open(sid
integer) RETURNS void
AS $$
29 --RAISE NOTICE 'io.open start';
30 query
:= format('UPDATE io.stream
31 SET data = '''', rl_prompt = '''', open = true
32 WHERE stream_id = %L', sid
);
33 PERFORM
dblink('dbname=mal', query
);
34 --RAISE NOTICE 'io.open done';
35 END; $$
LANGUAGE 'plpgsql' STRICT;
37 CREATE FUNCTION io.
close(sid
integer) RETURNS void
AS $$
41 --RAISE NOTICE 'io.close start';
42 query
:= format('UPDATE io.stream
43 SET rl_prompt = '''', open = false
44 WHERE stream_id = %L', sid
);
45 PERFORM
dblink('dbname=mal', query
);
46 --RAISE NOTICE 'io.close done';
47 END; $$
LANGUAGE 'plpgsql' STRICT;
50 -- called from read via dblink
51 CREATE FUNCTION io.
__read(sid
integer) RETURNS varchar AS $$
57 SELECT data, open INTO input, isopen
FROM io.stream
58 WHERE stream_id
= sid
;
60 UPDATE io.stream
SET data = '' WHERE stream_id
= sid
;
63 IF isopen
= false THEN
67 END; $$
LANGUAGE 'plpgsql' STRICT;
70 -- read from stream stream_id in stream table. Waits until there is
71 -- either data to return or the stream closes (NULL data). Returns
72 -- NULL when stream is closed.
73 CREATE FUNCTION io.
read(sid
integer DEFAULT 0) RETURNS varchar AS $$
79 -- poll / wait for input
80 query
:= format('SELECT io.__read(%L);', sid
);
84 -- atomic get and set to empty
85 SELECT cur_data
INTO input FROM dblink('dbname=mal', query
)
86 AS t1(cur_data
varchar);
87 IF input <> '' OR input IS NULL THEN
90 PERFORM
pg_sleep(sleep
);
92 sleep
:= sleep
* 1.1; -- backoff
95 END; $$
LANGUAGE 'plpgsql' STRICT;
98 -- similar to read, but throws exception when stream is closed
99 CREATE FUNCTION io.
read_or_error(sid
integer DEFAULT 0) RETURNS varchar AS $$
103 input := io.
read(sid
);
104 IF input IS NULL THEN
105 raise
EXCEPTION 'Stream ''%'' is closed', sid
;
109 END; $$
LANGUAGE 'plpgsql' STRICT;
113 -- set prompt and wait for readline style input on the stream
114 CREATE FUNCTION io.
readline(prompt
varchar, sid
integer DEFAULT 0)
115 RETURNS varchar AS $$
119 -- set prompt / request readline style input
121 PERFORM io.
wait_flushed(1);
123 PERFORM io.
wait_flushed(0);
125 query
:= format('LOCK io.stream; UPDATE io.stream SET rl_prompt = %L',
127 PERFORM
dblink('dbname=mal', query
);
130 END; $$
LANGUAGE 'plpgsql' STRICT;
132 CREATE FUNCTION io.
write(data varchar, sid
integer DEFAULT 1)
137 query
:= format('LOCK io.stream;
138 UPDATE io.stream SET data = data || %L WHERE stream_id = %L',
140 --RAISE NOTICE 'write query: %', query;
141 PERFORM
dblink('dbname=mal', query
);
142 END; $$
LANGUAGE 'plpgsql' STRICT;
144 CREATE FUNCTION io.
writeline(data varchar, sid
integer DEFAULT 1)
147 PERFORM io.
write(data || E
'\n', sid
);
148 END; $$
LANGUAGE 'plpgsql' STRICT;
150 -- ---------------------------------------------------------
152 -- called from wait_rl_prompt via dblink
153 CREATE FUNCTION io.
__wait_rl_prompt(sid
integer) RETURNS varchar AS $$
160 SELECT open, rl_prompt
INTO isopen
, prompt
FROM io.stream
161 WHERE stream_id
= sid
;
162 SELECT count(stream_id
) INTO datas
FROM io.stream
WHERE data <> '';
164 IF isopen
= false THEN
166 --raise EXCEPTION 'Stream ''%'' is closed', sid;
169 IF datas
= 0 AND prompt
<> '' THEN
170 UPDATE io.stream
SET rl_prompt
= '' WHERE stream_id
= sid
;
171 -- There is pending data on some stream
174 RETURN ''; -- '' -> no input
175 END; $$
LANGUAGE 'plpgsql' STRICT;
178 -- wait for rl_prompt to be set on the given stream and return the
179 -- rl_prompt value. Errors if stream is already closed.
180 CREATE FUNCTION io.
wait_rl_prompt(sid
integer DEFAULT 0) RETURNS varchar AS $$
186 query
:= format('SELECT io.__wait_rl_prompt(%L);', sid
);
189 SELECT rl_prompt
INTO prompt
FROM dblink('dbname=mal', query
)
190 AS t1(rl_prompt
varchar);
191 IF prompt
IS NULL THEN
192 raise
EXCEPTION 'Stream ''%'' is closed', sid
;
195 sleep
:= 0.05; -- reset sleep timer
198 PERFORM
pg_sleep(sleep
);
200 sleep
:= sleep
* 1.1; -- backoff
203 END; $$
LANGUAGE 'plpgsql' STRICT;
205 CREATE FUNCTION io.
wait_flushed(sid
integer DEFAULT 1) RETURNS void
AS $$
211 query
:= format('SELECT count(stream_id) FROM io.stream
212 WHERE stream_id = %L AND data <> ''''', sid
);
215 SELECT p
INTO pending
FROM dblink('dbname=mal', query
)
217 IF pending
= 0 THEN RETURN; END IF;
218 PERFORM
pg_sleep(sleep
);
220 sleep
:= sleep
* 1.1; -- backoff
223 END; $$
LANGUAGE 'plpgsql' STRICT;