plpgsql: restructure I/O to allow input in command mode.
[jackhill/mal.git] / plpgsql / io.sql
CommitLineData
53105a77
JM
1CREATE TABLE stream (
2 stream_id integer,
3 data varchar,
4 rl_prompt varchar -- prompt for readline input
5);
6
7INSERT INTO stream (stream_id, data, rl_prompt) VALUES (0, '', ''); -- stdin
8INSERT INTO stream (stream_id, data, rl_prompt) VALUES (1, '', ''); -- stdout
9
8e266c18
JM
10-- dblink is necessary to be able to sub-transactions (autonomous
11-- transactions) to the stream table. This is necessary to be able to
12-- modify the stream table from the perspective of outside callers
13-- because actual code can be long-lived and it's direct updates will
14-- not be seen until the process completes.
15CREATE EXTENSION dblink;
16
17-- ---------------------------------------------------------
18
19CREATE FUNCTION stream_open(sid integer) RETURNS void AS $$
20DECLARE
21 query varchar;
22BEGIN
23 --RAISE NOTICE 'stream_open start';
24 query := format('UPDATE stream SET data = '''', rl_prompt = '''' WHERE stream_id = %L', sid);
25 PERFORM dblink('dbname=mal', query);
26 --RAISE NOTICE 'stream_open done';
27END; $$ LANGUAGE 'plpgsql' STRICT;
28
29CREATE FUNCTION stream_close(sid integer) RETURNS void AS $$
30DECLARE
31 query varchar;
32BEGIN
33 --RAISE NOTICE 'stream_close start';
34 query := format('UPDATE stream SET data = NULL WHERE stream_id = %L', sid);
35 PERFORM dblink('dbname=mal', query);
36 --RAISE NOTICE 'stream_close done';
37END; $$ LANGUAGE 'plpgsql' STRICT;
38
39
40-- called via dblink
41CREATE FUNCTION __read(sid integer) RETURNS varchar AS $$
42DECLARE
43 input varchar;
44BEGIN
45 LOCK stream;
46 SELECT data INTO input FROM stream WHERE stream_id = sid;
47 IF input IS NOT NULL AND input <> '' THEN
48 UPDATE stream SET data = '' WHERE stream_id = sid;
49 END IF;
50 RETURN input; -- '' -> no input, NULL -> stream closed
51END; $$ LANGUAGE 'plpgsql' STRICT;
52
53-- read:
54-- read from stream stream_id in stream table. Waits until there is
55-- either data to return or the stream closes (NULL data). Returns
56-- NULL when stream is closed.
97c0256d 57CREATE FUNCTION read(stream_id integer DEFAULT 0)
53105a77
JM
58RETURNS varchar AS $$
59DECLARE
60 query varchar;
61 input varchar;
62 sleep real = 0.05;
63BEGIN
64 -- poll / wait for input
8e266c18 65 query := format('SELECT __read(%L);', stream_id);
53105a77
JM
66 WHILE true
67 LOOP
68 -- atomic get and set to empty
69 SELECT cur_data INTO input FROM dblink('dbname=mal', query)
70 AS t1(cur_data varchar);
71 --RAISE NOTICE 'read input: [%] %', input, stream_id;
8e266c18 72 IF input <> '' OR input IS NULL THEN
53105a77
JM
73 sleep := 0.05; -- reset sleep timer
74 --RAISE NOTICE 'read input: [%] %', input, stream_id;
75 --RETURN rtrim(input, E'\n');
76 RETURN input;
77 END IF;
78 --RAISE NOTICE 'sleeping 2 seconds';
79 --PERFORM pg_sleep(2);
80 PERFORM pg_sleep(sleep);
81 IF sleep < 0.5 THEN
82 sleep := sleep * 1.1; -- backoff
83 END IF;
84 END LOOP;
97c0256d 85END; $$ LANGUAGE 'plpgsql' STRICT;
53105a77 86
8e266c18
JM
87-- read_or_error:
88-- similar to read, but throws exception when stream is closed
89CREATE FUNCTION read_or_error(stream_id integer DEFAULT 0)
90RETURNS varchar AS $$
91DECLARE
92 input varchar;
93BEGIN
94 input := read(stream_id);
95 IF input IS NULL THEN
96 raise EXCEPTION 'Stream ''%'' is closed', stream_id;
97 ELSE
98 RETURN input;
99 END IF;
100END; $$ LANGUAGE 'plpgsql' STRICT;
101
102
53105a77
JM
103-- readline:
104-- set prompt and wait for readline style input on the stream
97c0256d 105CREATE FUNCTION readline(prompt varchar, stream_id integer DEFAULT 0)
53105a77
JM
106RETURNS varchar AS $$
107DECLARE
108 query varchar;
109BEGIN
110 -- set prompt / request readline style input
111 query := format('LOCK stream; UPDATE stream SET rl_prompt = %L', prompt);
112 PERFORM dblink('dbname=mal', query);
113
114 RETURN read(stream_id);
97c0256d 115END; $$ LANGUAGE 'plpgsql' STRICT;
53105a77 116
97c0256d 117CREATE FUNCTION write(data varchar, stream_id integer DEFAULT 1)
53105a77
JM
118RETURNS void AS $$
119DECLARE
120 query varchar;
121BEGIN
122 query := format('LOCK stream; UPDATE stream SET data = data || %L WHERE stream_id = %L',
123 data, stream_id);
124 --RAISE NOTICE 'write query: %', query;
125 PERFORM dblink('dbname=mal', query);
97c0256d 126END; $$ LANGUAGE 'plpgsql' STRICT;
53105a77 127
97c0256d 128CREATE FUNCTION writeline(data varchar, stream_id integer DEFAULT 1)
53105a77
JM
129RETURNS void AS $$
130BEGIN
131 PERFORM write(data || E'\n', stream_id);
97c0256d 132END; $$ LANGUAGE 'plpgsql' STRICT;
53105a77
JM
133
134-- wait_rl_prompt:
135-- wait for rl_prompt to be set on the given stream and return the
136-- rl_prompt value
97c0256d 137CREATE FUNCTION wait_rl_prompt(stream_id integer DEFAULT 0)
53105a77
JM
138RETURNS varchar AS $$
139DECLARE
140 dquery varchar;
141 pending integer;
142 query varchar;
143 prompt varchar;
144 sleep real = 0.05;
145BEGIN
146 -- make sure no data is pending on any stream
147 dquery := format('SELECT count(data) FROM stream WHERE data <> ''''');
148 -- wait for readline style input to be requested
53105a77
JM
149 query := format('LOCK stream; UPDATE stream x SET rl_prompt = '''' FROM (SELECT rl_prompt FROM stream WHERE stream_id = %L AND rl_prompt <> '''') y WHERE x.stream_id = %L AND x.rl_prompt <> '''' RETURNING y.rl_prompt AS rl_prompt', stream_id, stream_id);
150 WHILE true
151 LOOP
152 SELECT p INTO pending FROM dblink('dbname=mal', dquery)
153 AS t1(p integer);
154 IF pending = 0 THEN
155 -- atomic get and set to empty
156 SELECT rl_prompt INTO prompt FROM dblink('dbname=mal', query)
157 AS t1(rl_prompt varchar);
158 IF prompt <> '' THEN
159 sleep := 0.05; -- reset sleep timer
160 RETURN prompt;
161 END IF;
162 END IF;
163 PERFORM pg_sleep(sleep);
164 IF sleep < 0.5 THEN
165 sleep := sleep * 1.1; -- backoff
166 END IF;
167 END LOOP;
97c0256d 168END; $$ LANGUAGE 'plpgsql' STRICT;
53105a77 169
8e266c18
JM
170CREATE FUNCTION wait_flushed(stream_id integer DEFAULT 1)
171RETURNS void AS $$
172DECLARE
173 query varchar;
174 pending integer;
175 sleep real = 0.05;
176BEGIN
177 query := format('SELECT count(data) FROM stream WHERE stream_id = %L AND data IS NOT NULL AND data <> ''''', stream_id);
178 WHILE true
179 LOOP
180 SELECT p INTO pending FROM dblink('dbname=mal', query)
181 AS t1(p integer);
182 IF pending = 0 THEN RETURN; END IF;
183 PERFORM pg_sleep(sleep);
184 IF sleep < 0.5 THEN
185 sleep := sleep * 1.1; -- backoff
186 END IF;
187 END LOOP;
188END; $$ LANGUAGE 'plpgsql' STRICT;
189