DISABLE FDs (REMOVE ME).
[jackhill/mal.git] / plpgsql / io.sql
1 -- dblink is necessary to be able to sub-transactions (autonomous
2 -- transactions) to the stream table. This is necessary to be able to
3 -- modify the stream table from the perspective of outside callers
4 -- because actual code can be long-lived and it's direct updates will
5 -- not be seen until the process completes.
6
7 CREATE SCHEMA io
8
9 CREATE TABLE stream (
10 stream_id integer,
11 open boolean,
12 data varchar,
13 rl_prompt varchar -- prompt for readline input
14 );
15
16 -- stdin
17 INSERT INTO io.stream (stream_id, open, data, rl_prompt)
18 VALUES (0, false, '', '');
19 -- stdout
20 INSERT INTO io.stream (stream_id, open, data, rl_prompt)
21 VALUES (1, false, '', '');
22
23 -- ---------------------------------------------------------
24
25 CREATE FUNCTION io.open(sid integer) RETURNS void AS $$
26 DECLARE
27 query varchar;
28 BEGIN
29 --RAISE NOTICE 'io.open start';
30 query := format('UPDATE io.stream
31 SET data = '''', rl_prompt = '''', open = true
32 WHERE stream_id = %L', sid);
33 PERFORM dblink('dbname=mal', query);
34 --RAISE NOTICE 'io.open done';
35 END; $$ LANGUAGE 'plpgsql' STRICT;
36
37 CREATE FUNCTION io.close(sid integer) RETURNS void AS $$
38 DECLARE
39 query varchar;
40 BEGIN
41 --RAISE NOTICE 'io.close start';
42 query := format('UPDATE io.stream
43 SET rl_prompt = '''', open = false
44 WHERE stream_id = %L', sid);
45 PERFORM dblink('dbname=mal', query);
46 --RAISE NOTICE 'io.close done';
47 END; $$ LANGUAGE 'plpgsql' STRICT;
48
49
50 -- called from read via dblink
51 CREATE FUNCTION io.__read(sid integer) RETURNS varchar AS $$
52 DECLARE
53 input varchar;
54 isopen boolean;
55 BEGIN
56 LOCK io.stream;
57 SELECT data, open INTO input, isopen FROM io.stream
58 WHERE stream_id = sid;
59 IF input <> '' THEN
60 UPDATE io.stream SET data = '' WHERE stream_id = sid;
61 RETURN input;
62 END IF;
63 IF isopen = false THEN
64 RETURN NULL;
65 END IF;
66 RETURN input;
67 END; $$ LANGUAGE 'plpgsql' STRICT;
68
69 -- read:
70 -- read from stream stream_id in stream table. Waits until there is
71 -- either data to return or the stream closes (NULL data). Returns
72 -- NULL when stream is closed.
73 CREATE FUNCTION io.read(sid integer DEFAULT 0) RETURNS varchar AS $$
74 DECLARE
75 query varchar;
76 input varchar;
77 sleep real = 0.05;
78 BEGIN
79 -- poll / wait for input
80 query := format('SELECT io.__read(%L);', sid);
81
82 WHILE true
83 LOOP
84 -- atomic get and set to empty
85 SELECT cur_data INTO input FROM dblink('dbname=mal', query)
86 AS t1(cur_data varchar);
87 IF input <> '' OR input IS NULL THEN
88 RETURN input;
89 END IF;
90 PERFORM pg_sleep(sleep);
91 IF sleep < 0.5 THEN
92 sleep := sleep * 1.1; -- backoff
93 END IF;
94 END LOOP;
95 END; $$ LANGUAGE 'plpgsql' STRICT;
96
97 -- read_or_error:
98 -- similar to read, but throws exception when stream is closed
99 CREATE FUNCTION io.read_or_error(sid integer DEFAULT 0) RETURNS varchar AS $$
100 DECLARE
101 input varchar;
102 BEGIN
103 input := io.read(sid);
104 IF input IS NULL THEN
105 raise EXCEPTION 'Stream ''%'' is closed', sid;
106 ELSE
107 RETURN input;
108 END IF;
109 END; $$ LANGUAGE 'plpgsql' STRICT;
110
111
112 -- readline:
113 -- set prompt and wait for readline style input on the stream
114 CREATE FUNCTION io.readline(prompt varchar, sid integer DEFAULT 0)
115 RETURNS varchar AS $$
116 DECLARE
117 query varchar;
118 BEGIN
119 -- set prompt / request readline style input
120 IF sid = 0 THEN
121 PERFORM io.wait_flushed(1);
122 ELSIF sid = 1 THEN
123 PERFORM io.wait_flushed(0);
124 END IF;
125 query := format('LOCK io.stream; UPDATE io.stream SET rl_prompt = %L',
126 prompt);
127 PERFORM dblink('dbname=mal', query);
128
129 RETURN io.read(sid);
130 END; $$ LANGUAGE 'plpgsql' STRICT;
131
132 CREATE FUNCTION io.write(data varchar, sid integer DEFAULT 1)
133 RETURNS void AS $$
134 DECLARE
135 query varchar;
136 BEGIN
137 query := format('LOCK io.stream;
138 UPDATE io.stream SET data = data || %L WHERE stream_id = %L',
139 data, sid);
140 --RAISE NOTICE 'write query: %', query;
141 PERFORM dblink('dbname=mal', query);
142 END; $$ LANGUAGE 'plpgsql' STRICT;
143
144 CREATE FUNCTION io.writeline(data varchar, sid integer DEFAULT 1)
145 RETURNS void AS $$
146 BEGIN
147 PERFORM io.write(data || E'\n', sid);
148 END; $$ LANGUAGE 'plpgsql' STRICT;
149
150 -- ---------------------------------------------------------
151
152 -- called from wait_rl_prompt via dblink
153 CREATE FUNCTION io.__wait_rl_prompt(sid integer) RETURNS varchar AS $$
154 DECLARE
155 isopen boolean;
156 prompt varchar;
157 datas integer;
158 BEGIN
159 LOCK io.stream;
160 SELECT open, rl_prompt INTO isopen, prompt FROM io.stream
161 WHERE stream_id = sid;
162 SELECT count(stream_id) INTO datas FROM io.stream WHERE data <> '';
163
164 IF isopen = false THEN
165 return NULL;
166 --raise EXCEPTION 'Stream ''%'' is closed', sid;
167 END IF;
168
169 IF datas = 0 AND prompt <> '' THEN
170 UPDATE io.stream SET rl_prompt = '' WHERE stream_id = sid;
171 -- There is pending data on some stream
172 RETURN prompt;
173 END IF;
174 RETURN ''; -- '' -> no input
175 END; $$ LANGUAGE 'plpgsql' STRICT;
176
177 -- wait_rl_prompt:
178 -- wait for rl_prompt to be set on the given stream and return the
179 -- rl_prompt value. Errors if stream is already closed.
180 CREATE FUNCTION io.wait_rl_prompt(sid integer DEFAULT 0) RETURNS varchar AS $$
181 DECLARE
182 query varchar;
183 prompt varchar;
184 sleep real = 0.05;
185 BEGIN
186 query := format('SELECT io.__wait_rl_prompt(%L);', sid);
187 WHILE true
188 LOOP
189 SELECT rl_prompt INTO prompt FROM dblink('dbname=mal', query)
190 AS t1(rl_prompt varchar);
191 IF prompt IS NULL THEN
192 raise EXCEPTION 'Stream ''%'' is closed', sid;
193 END IF;
194 IF prompt <> '' THEN
195 sleep := 0.05; -- reset sleep timer
196 RETURN prompt;
197 END IF;
198 PERFORM pg_sleep(sleep);
199 IF sleep < 0.5 THEN
200 sleep := sleep * 1.1; -- backoff
201 END IF;
202 END LOOP;
203 END; $$ LANGUAGE 'plpgsql' STRICT;
204
205 CREATE FUNCTION io.wait_flushed(sid integer DEFAULT 1) RETURNS void AS $$
206 DECLARE
207 query varchar;
208 pending integer;
209 sleep real = 0.05;
210 BEGIN
211 query := format('SELECT count(stream_id) FROM io.stream
212 WHERE stream_id = %L AND data <> ''''', sid);
213 WHILE true
214 LOOP
215 SELECT p INTO pending FROM dblink('dbname=mal', query)
216 AS t1(p integer);
217 IF pending = 0 THEN RETURN; END IF;
218 PERFORM pg_sleep(sleep);
219 IF sleep < 0.5 THEN
220 sleep := sleep * 1.1; -- backoff
221 END IF;
222 END LOOP;
223 END; $$ LANGUAGE 'plpgsql' STRICT;
224