DISABLE FDs (REMOVE ME).
[jackhill/mal.git] / plpgsql / io.sql
CommitLineData
8e266c18
JM
1-- dblink is necessary to be able to sub-transactions (autonomous
2-- transactions) to the stream table. This is necessary to be able to
3-- modify the stream table from the perspective of outside callers
4-- because actual code can be long-lived and it's direct updates will
5-- not be seen until the process completes.
494792ab
JM
6
7CREATE SCHEMA io
8
9 CREATE TABLE stream (
10 stream_id integer,
11 open boolean,
12 data varchar,
13 rl_prompt varchar -- prompt for readline input
14 );
15
16-- stdin
17INSERT INTO io.stream (stream_id, open, data, rl_prompt)
18 VALUES (0, false, '', '');
19-- stdout
20INSERT INTO io.stream (stream_id, open, data, rl_prompt)
21 VALUES (1, false, '', '');
8e266c18
JM
22
23-- ---------------------------------------------------------
24
494792ab 25CREATE FUNCTION io.open(sid integer) RETURNS void AS $$
8e266c18
JM
26DECLARE
27 query varchar;
28BEGIN
494792ab
JM
29 --RAISE NOTICE 'io.open start';
30 query := format('UPDATE io.stream
31 SET data = '''', rl_prompt = '''', open = true
32 WHERE stream_id = %L', sid);
8e266c18 33 PERFORM dblink('dbname=mal', query);
494792ab 34 --RAISE NOTICE 'io.open done';
8e266c18
JM
35END; $$ LANGUAGE 'plpgsql' STRICT;
36
494792ab 37CREATE FUNCTION io.close(sid integer) RETURNS void AS $$
8e266c18
JM
38DECLARE
39 query varchar;
40BEGIN
494792ab
JM
41 --RAISE NOTICE 'io.close start';
42 query := format('UPDATE io.stream
43 SET rl_prompt = '''', open = false
44 WHERE stream_id = %L', sid);
8e266c18 45 PERFORM dblink('dbname=mal', query);
494792ab 46 --RAISE NOTICE 'io.close done';
8e266c18
JM
47END; $$ LANGUAGE 'plpgsql' STRICT;
48
49
767d735d 50-- called from read via dblink
494792ab 51CREATE FUNCTION io.__read(sid integer) RETURNS varchar AS $$
8e266c18 52DECLARE
494792ab
JM
53 input varchar;
54 isopen boolean;
8e266c18 55BEGIN
494792ab
JM
56 LOCK io.stream;
57 SELECT data, open INTO input, isopen FROM io.stream
58 WHERE stream_id = sid;
59 IF input <> '' THEN
60 UPDATE io.stream SET data = '' WHERE stream_id = sid;
61 RETURN input;
62 END IF;
63 IF isopen = false THEN
64 RETURN NULL;
8e266c18 65 END IF;
494792ab 66 RETURN input;
8e266c18
JM
67END; $$ LANGUAGE 'plpgsql' STRICT;
68
69-- read:
70-- read from stream stream_id in stream table. Waits until there is
71-- either data to return or the stream closes (NULL data). Returns
72-- NULL when stream is closed.
494792ab 73CREATE FUNCTION io.read(sid integer DEFAULT 0) RETURNS varchar AS $$
53105a77
JM
74DECLARE
75 query varchar;
76 input varchar;
77 sleep real = 0.05;
78BEGIN
79 -- poll / wait for input
494792ab
JM
80 query := format('SELECT io.__read(%L);', sid);
81
53105a77
JM
82 WHILE true
83 LOOP
84 -- atomic get and set to empty
85 SELECT cur_data INTO input FROM dblink('dbname=mal', query)
86 AS t1(cur_data varchar);
8e266c18 87 IF input <> '' OR input IS NULL THEN
53105a77
JM
88 RETURN input;
89 END IF;
53105a77
JM
90 PERFORM pg_sleep(sleep);
91 IF sleep < 0.5 THEN
92 sleep := sleep * 1.1; -- backoff
93 END IF;
94 END LOOP;
97c0256d 95END; $$ LANGUAGE 'plpgsql' STRICT;
53105a77 96
8e266c18
JM
97-- read_or_error:
98-- similar to read, but throws exception when stream is closed
494792ab 99CREATE FUNCTION io.read_or_error(sid integer DEFAULT 0) RETURNS varchar AS $$
8e266c18
JM
100DECLARE
101 input varchar;
102BEGIN
494792ab 103 input := io.read(sid);
8e266c18 104 IF input IS NULL THEN
494792ab 105 raise EXCEPTION 'Stream ''%'' is closed', sid;
8e266c18
JM
106 ELSE
107 RETURN input;
108 END IF;
109END; $$ LANGUAGE 'plpgsql' STRICT;
110
111
53105a77
JM
112-- readline:
113-- set prompt and wait for readline style input on the stream
494792ab
JM
114CREATE FUNCTION io.readline(prompt varchar, sid integer DEFAULT 0)
115 RETURNS varchar AS $$
53105a77
JM
116DECLARE
117 query varchar;
118BEGIN
119 -- set prompt / request readline style input
494792ab
JM
120 IF sid = 0 THEN
121 PERFORM io.wait_flushed(1);
122 ELSIF sid = 1 THEN
123 PERFORM io.wait_flushed(0);
124 END IF;
125 query := format('LOCK io.stream; UPDATE io.stream SET rl_prompt = %L',
126 prompt);
53105a77
JM
127 PERFORM dblink('dbname=mal', query);
128
494792ab 129 RETURN io.read(sid);
97c0256d 130END; $$ LANGUAGE 'plpgsql' STRICT;
53105a77 131
494792ab 132CREATE FUNCTION io.write(data varchar, sid integer DEFAULT 1)
53105a77
JM
133RETURNS void AS $$
134DECLARE
135 query varchar;
136BEGIN
494792ab
JM
137 query := format('LOCK io.stream;
138 UPDATE io.stream SET data = data || %L WHERE stream_id = %L',
139 data, sid);
53105a77
JM
140 --RAISE NOTICE 'write query: %', query;
141 PERFORM dblink('dbname=mal', query);
97c0256d 142END; $$ LANGUAGE 'plpgsql' STRICT;
53105a77 143
494792ab 144CREATE FUNCTION io.writeline(data varchar, sid integer DEFAULT 1)
53105a77
JM
145RETURNS void AS $$
146BEGIN
494792ab 147 PERFORM io.write(data || E'\n', sid);
97c0256d 148END; $$ LANGUAGE 'plpgsql' STRICT;
53105a77 149
767d735d
JM
150-- ---------------------------------------------------------
151
152-- called from wait_rl_prompt via dblink
494792ab 153CREATE FUNCTION io.__wait_rl_prompt(sid integer) RETURNS varchar AS $$
767d735d 154DECLARE
494792ab 155 isopen boolean;
767d735d 156 prompt varchar;
494792ab 157 datas integer;
767d735d 158BEGIN
494792ab
JM
159 LOCK io.stream;
160 SELECT open, rl_prompt INTO isopen, prompt FROM io.stream
161 WHERE stream_id = sid;
162 SELECT count(stream_id) INTO datas FROM io.stream WHERE data <> '';
163
164 IF isopen = false THEN
767d735d
JM
165 return NULL;
166 --raise EXCEPTION 'Stream ''%'' is closed', sid;
167 END IF;
494792ab
JM
168
169 IF datas = 0 AND prompt <> '' THEN
170 UPDATE io.stream SET rl_prompt = '' WHERE stream_id = sid;
767d735d 171 -- There is pending data on some stream
494792ab 172 RETURN prompt;
767d735d 173 END IF;
494792ab 174 RETURN ''; -- '' -> no input
767d735d
JM
175END; $$ LANGUAGE 'plpgsql' STRICT;
176
53105a77
JM
177-- wait_rl_prompt:
178-- wait for rl_prompt to be set on the given stream and return the
767d735d 179-- rl_prompt value. Errors if stream is already closed.
494792ab 180CREATE FUNCTION io.wait_rl_prompt(sid integer DEFAULT 0) RETURNS varchar AS $$
53105a77 181DECLARE
53105a77
JM
182 query varchar;
183 prompt varchar;
184 sleep real = 0.05;
185BEGIN
494792ab 186 query := format('SELECT io.__wait_rl_prompt(%L);', sid);
53105a77
JM
187 WHILE true
188 LOOP
767d735d
JM
189 SELECT rl_prompt INTO prompt FROM dblink('dbname=mal', query)
190 AS t1(rl_prompt varchar);
191 IF prompt IS NULL THEN
494792ab 192 raise EXCEPTION 'Stream ''%'' is closed', sid;
767d735d
JM
193 END IF;
194 IF prompt <> '' THEN
195 sleep := 0.05; -- reset sleep timer
196 RETURN prompt;
53105a77
JM
197 END IF;
198 PERFORM pg_sleep(sleep);
199 IF sleep < 0.5 THEN
200 sleep := sleep * 1.1; -- backoff
201 END IF;
202 END LOOP;
97c0256d 203END; $$ LANGUAGE 'plpgsql' STRICT;
53105a77 204
494792ab 205CREATE FUNCTION io.wait_flushed(sid integer DEFAULT 1) RETURNS void AS $$
8e266c18
JM
206DECLARE
207 query varchar;
208 pending integer;
209 sleep real = 0.05;
210BEGIN
494792ab
JM
211 query := format('SELECT count(stream_id) FROM io.stream
212 WHERE stream_id = %L AND data <> ''''', sid);
8e266c18
JM
213 WHILE true
214 LOOP
215 SELECT p INTO pending FROM dblink('dbname=mal', query)
216 AS t1(p integer);
217 IF pending = 0 THEN RETURN; END IF;
218 PERFORM pg_sleep(sleep);
219 IF sleep < 0.5 THEN
220 sleep := sleep * 1.1; -- backoff
221 END IF;
222 END LOOP;
223END; $$ LANGUAGE 'plpgsql' STRICT;
224