Commit | Line | Data |
---|---|---|
8e266c18 JM |
1 | -- dblink is necessary to be able to sub-transactions (autonomous |
2 | -- transactions) to the stream table. This is necessary to be able to | |
3 | -- modify the stream table from the perspective of outside callers | |
4 | -- because actual code can be long-lived and it's direct updates will | |
5 | -- not be seen until the process completes. | |
494792ab JM |
6 | |
7 | CREATE SCHEMA io | |
8 | ||
9 | CREATE TABLE stream ( | |
10 | stream_id integer, | |
11 | open boolean, | |
12 | data varchar, | |
13 | rl_prompt varchar -- prompt for readline input | |
14 | ); | |
15 | ||
16 | -- stdin | |
17 | INSERT INTO io.stream (stream_id, open, data, rl_prompt) | |
18 | VALUES (0, false, '', ''); | |
19 | -- stdout | |
20 | INSERT INTO io.stream (stream_id, open, data, rl_prompt) | |
21 | VALUES (1, false, '', ''); | |
8e266c18 JM |
22 | |
23 | -- --------------------------------------------------------- | |
24 | ||
494792ab | 25 | CREATE FUNCTION io.open(sid integer) RETURNS void AS $$ |
8e266c18 JM |
26 | DECLARE |
27 | query varchar; | |
28 | BEGIN | |
494792ab JM |
29 | --RAISE NOTICE 'io.open start'; |
30 | query := format('UPDATE io.stream | |
31 | SET data = '''', rl_prompt = '''', open = true | |
32 | WHERE stream_id = %L', sid); | |
8e266c18 | 33 | PERFORM dblink('dbname=mal', query); |
494792ab | 34 | --RAISE NOTICE 'io.open done'; |
8e266c18 JM |
35 | END; $$ LANGUAGE 'plpgsql' STRICT; |
36 | ||
494792ab | 37 | CREATE FUNCTION io.close(sid integer) RETURNS void AS $$ |
8e266c18 JM |
38 | DECLARE |
39 | query varchar; | |
40 | BEGIN | |
494792ab JM |
41 | --RAISE NOTICE 'io.close start'; |
42 | query := format('UPDATE io.stream | |
43 | SET rl_prompt = '''', open = false | |
44 | WHERE stream_id = %L', sid); | |
8e266c18 | 45 | PERFORM dblink('dbname=mal', query); |
494792ab | 46 | --RAISE NOTICE 'io.close done'; |
8e266c18 JM |
47 | END; $$ LANGUAGE 'plpgsql' STRICT; |
48 | ||
49 | ||
767d735d | 50 | -- called from read via dblink |
494792ab | 51 | CREATE FUNCTION io.__read(sid integer) RETURNS varchar AS $$ |
8e266c18 | 52 | DECLARE |
494792ab JM |
53 | input varchar; |
54 | isopen boolean; | |
8e266c18 | 55 | BEGIN |
494792ab JM |
56 | LOCK io.stream; |
57 | SELECT data, open INTO input, isopen FROM io.stream | |
58 | WHERE stream_id = sid; | |
59 | IF input <> '' THEN | |
60 | UPDATE io.stream SET data = '' WHERE stream_id = sid; | |
61 | RETURN input; | |
62 | END IF; | |
63 | IF isopen = false THEN | |
64 | RETURN NULL; | |
8e266c18 | 65 | END IF; |
494792ab | 66 | RETURN input; |
8e266c18 JM |
67 | END; $$ LANGUAGE 'plpgsql' STRICT; |
68 | ||
69 | -- read: | |
70 | -- read from stream stream_id in stream table. Waits until there is | |
71 | -- either data to return or the stream closes (NULL data). Returns | |
72 | -- NULL when stream is closed. | |
494792ab | 73 | CREATE FUNCTION io.read(sid integer DEFAULT 0) RETURNS varchar AS $$ |
53105a77 JM |
74 | DECLARE |
75 | query varchar; | |
76 | input varchar; | |
77 | sleep real = 0.05; | |
78 | BEGIN | |
79 | -- poll / wait for input | |
494792ab JM |
80 | query := format('SELECT io.__read(%L);', sid); |
81 | ||
53105a77 JM |
82 | WHILE true |
83 | LOOP | |
84 | -- atomic get and set to empty | |
85 | SELECT cur_data INTO input FROM dblink('dbname=mal', query) | |
86 | AS t1(cur_data varchar); | |
8e266c18 | 87 | IF input <> '' OR input IS NULL THEN |
53105a77 JM |
88 | RETURN input; |
89 | END IF; | |
53105a77 JM |
90 | PERFORM pg_sleep(sleep); |
91 | IF sleep < 0.5 THEN | |
92 | sleep := sleep * 1.1; -- backoff | |
93 | END IF; | |
94 | END LOOP; | |
97c0256d | 95 | END; $$ LANGUAGE 'plpgsql' STRICT; |
53105a77 | 96 | |
8e266c18 JM |
97 | -- read_or_error: |
98 | -- similar to read, but throws exception when stream is closed | |
494792ab | 99 | CREATE FUNCTION io.read_or_error(sid integer DEFAULT 0) RETURNS varchar AS $$ |
8e266c18 JM |
100 | DECLARE |
101 | input varchar; | |
102 | BEGIN | |
494792ab | 103 | input := io.read(sid); |
8e266c18 | 104 | IF input IS NULL THEN |
494792ab | 105 | raise EXCEPTION 'Stream ''%'' is closed', sid; |
8e266c18 JM |
106 | ELSE |
107 | RETURN input; | |
108 | END IF; | |
109 | END; $$ LANGUAGE 'plpgsql' STRICT; | |
110 | ||
111 | ||
53105a77 JM |
112 | -- readline: |
113 | -- set prompt and wait for readline style input on the stream | |
494792ab JM |
114 | CREATE FUNCTION io.readline(prompt varchar, sid integer DEFAULT 0) |
115 | RETURNS varchar AS $$ | |
53105a77 JM |
116 | DECLARE |
117 | query varchar; | |
118 | BEGIN | |
119 | -- set prompt / request readline style input | |
494792ab JM |
120 | IF sid = 0 THEN |
121 | PERFORM io.wait_flushed(1); | |
122 | ELSIF sid = 1 THEN | |
123 | PERFORM io.wait_flushed(0); | |
124 | END IF; | |
125 | query := format('LOCK io.stream; UPDATE io.stream SET rl_prompt = %L', | |
126 | prompt); | |
53105a77 JM |
127 | PERFORM dblink('dbname=mal', query); |
128 | ||
494792ab | 129 | RETURN io.read(sid); |
97c0256d | 130 | END; $$ LANGUAGE 'plpgsql' STRICT; |
53105a77 | 131 | |
494792ab | 132 | CREATE FUNCTION io.write(data varchar, sid integer DEFAULT 1) |
53105a77 JM |
133 | RETURNS void AS $$ |
134 | DECLARE | |
135 | query varchar; | |
136 | BEGIN | |
494792ab JM |
137 | query := format('LOCK io.stream; |
138 | UPDATE io.stream SET data = data || %L WHERE stream_id = %L', | |
139 | data, sid); | |
53105a77 JM |
140 | --RAISE NOTICE 'write query: %', query; |
141 | PERFORM dblink('dbname=mal', query); | |
97c0256d | 142 | END; $$ LANGUAGE 'plpgsql' STRICT; |
53105a77 | 143 | |
494792ab | 144 | CREATE FUNCTION io.writeline(data varchar, sid integer DEFAULT 1) |
53105a77 JM |
145 | RETURNS void AS $$ |
146 | BEGIN | |
494792ab | 147 | PERFORM io.write(data || E'\n', sid); |
97c0256d | 148 | END; $$ LANGUAGE 'plpgsql' STRICT; |
53105a77 | 149 | |
767d735d JM |
150 | -- --------------------------------------------------------- |
151 | ||
152 | -- called from wait_rl_prompt via dblink | |
494792ab | 153 | CREATE FUNCTION io.__wait_rl_prompt(sid integer) RETURNS varchar AS $$ |
767d735d | 154 | DECLARE |
494792ab | 155 | isopen boolean; |
767d735d | 156 | prompt varchar; |
494792ab | 157 | datas integer; |
767d735d | 158 | BEGIN |
494792ab JM |
159 | LOCK io.stream; |
160 | SELECT open, rl_prompt INTO isopen, prompt FROM io.stream | |
161 | WHERE stream_id = sid; | |
162 | SELECT count(stream_id) INTO datas FROM io.stream WHERE data <> ''; | |
163 | ||
164 | IF isopen = false THEN | |
767d735d JM |
165 | return NULL; |
166 | --raise EXCEPTION 'Stream ''%'' is closed', sid; | |
167 | END IF; | |
494792ab JM |
168 | |
169 | IF datas = 0 AND prompt <> '' THEN | |
170 | UPDATE io.stream SET rl_prompt = '' WHERE stream_id = sid; | |
767d735d | 171 | -- There is pending data on some stream |
494792ab | 172 | RETURN prompt; |
767d735d | 173 | END IF; |
494792ab | 174 | RETURN ''; -- '' -> no input |
767d735d JM |
175 | END; $$ LANGUAGE 'plpgsql' STRICT; |
176 | ||
53105a77 JM |
177 | -- wait_rl_prompt: |
178 | -- wait for rl_prompt to be set on the given stream and return the | |
767d735d | 179 | -- rl_prompt value. Errors if stream is already closed. |
494792ab | 180 | CREATE FUNCTION io.wait_rl_prompt(sid integer DEFAULT 0) RETURNS varchar AS $$ |
53105a77 | 181 | DECLARE |
53105a77 JM |
182 | query varchar; |
183 | prompt varchar; | |
184 | sleep real = 0.05; | |
185 | BEGIN | |
494792ab | 186 | query := format('SELECT io.__wait_rl_prompt(%L);', sid); |
53105a77 JM |
187 | WHILE true |
188 | LOOP | |
767d735d JM |
189 | SELECT rl_prompt INTO prompt FROM dblink('dbname=mal', query) |
190 | AS t1(rl_prompt varchar); | |
191 | IF prompt IS NULL THEN | |
494792ab | 192 | raise EXCEPTION 'Stream ''%'' is closed', sid; |
767d735d JM |
193 | END IF; |
194 | IF prompt <> '' THEN | |
195 | sleep := 0.05; -- reset sleep timer | |
196 | RETURN prompt; | |
53105a77 JM |
197 | END IF; |
198 | PERFORM pg_sleep(sleep); | |
199 | IF sleep < 0.5 THEN | |
200 | sleep := sleep * 1.1; -- backoff | |
201 | END IF; | |
202 | END LOOP; | |
97c0256d | 203 | END; $$ LANGUAGE 'plpgsql' STRICT; |
53105a77 | 204 | |
494792ab | 205 | CREATE FUNCTION io.wait_flushed(sid integer DEFAULT 1) RETURNS void AS $$ |
8e266c18 JM |
206 | DECLARE |
207 | query varchar; | |
208 | pending integer; | |
209 | sleep real = 0.05; | |
210 | BEGIN | |
494792ab JM |
211 | query := format('SELECT count(stream_id) FROM io.stream |
212 | WHERE stream_id = %L AND data <> ''''', sid); | |
8e266c18 JM |
213 | WHILE true |
214 | LOOP | |
215 | SELECT p INTO pending FROM dblink('dbname=mal', query) | |
216 | AS t1(p integer); | |
217 | IF pending = 0 THEN RETURN; END IF; | |
218 | PERFORM pg_sleep(sleep); | |
219 | IF sleep < 0.5 THEN | |
220 | sleep := sleep * 1.1; -- backoff | |
221 | END IF; | |
222 | END LOOP; | |
223 | END; $$ LANGUAGE 'plpgsql' STRICT; | |
224 |