1、开始、关闭libreoffice服务;
开始之前同步字体文件时间,是因为创建soffice服务时,服务会检查所需加载的文件的时间,如果其认为时间不符,则其可能会重新加载,耗时较长,因此需事先统一时间。
使用时如果需要多次调用,最后每次调用均开启后关闭,否则libreoffice会创建一个缓存文档并越用越大,处理时间会增加。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
class OfficeProcess( object ): def __init__( self ): self .p = 0 subprocess.Popen( 'find /usr/share/fonts | xargs touch -m -t 201801010000.00' , shell = True ) def start_office( self ): self .p = subprocess.Popen( 'soffice --pidfile=sof.pid --invisible --accept="socket,host=localhost,port=2002;urp;"' , shell = True ) while True : try : local_context = uno.getComponentContext() resolver = local_context.getServiceManager().createInstanceWithContext( 'com.sun.star.bridge.UnoUrlResolver' , local_context) resolver.resolve( 'uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext' ) return except : print (ts(), "wait for connecting soffice..." ) time.sleep( 1 ) continue def stop_office( self ): with open ( "sof.pid" , "rb" ) as f: try : os.kill( int (f.read()), signal.SIGTERM) self .p.wait() except : pass |
2、init service manager
1
2
3
4
5
6
|
local_context = uno.getComponentContext() service_manager = local_context.getServiceManager() resolver = service_manager.createInstanceWithContext( 'com.sun.star.bridge.UnoUrlResolver' , local_context) self .ctx = resolver.resolve( 'uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext' ) self .smgr = self .ctx.ServiceManager self .desktop = self .smgr.createInstanceWithContext( 'com.sun.star.frame.Desktop' , self .ctx) |
3、从二进制数据中读取doc文档
1
2
3
4
5
6
7
8
9
10
11
12
|
def ImportFromMemory( self , data): istream = self .smgr.createInstanceWithContext( 'com.sun.star.io.SequenceInputStream' , self .ctx) istream.initialize((uno.ByteSequence(data), )) pv = PropertyValue() pv.Name = 'InputStream' pv.Value = istream self .doc = { 'doc' : []} try : self .document = self .desktop.loadComponentFromURL( 'private:stream/swriter' , '_blank' , 0 , (pv, )) self .text = self .document.getText() except : self .text = None |
4、读取doc文档中的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
|
def ExportToJson( self ): try : l = self .__ParseText( self .text, self .__Callback( self .doc[ 'doc' ])) self .doc[ 'length' ] = l except : self .doc = { 'doc' : [], 'length' : 0 } return json.dumps( self .doc) @staticmethod def __Callback(alist): def Append(sth): alist.append(sth) return Append |
1
2
3
4
5
6
7
8
9
10
11
12
|
def __ParseText( self , text, func): l = 0 text_it = text.createEnumeration() while text_it.hasMoreElements(): element = text_it.nextElement() if element.supportsService( 'com.sun.star.text.Paragraph' ): l + = self .__ParseParagraph(element, func) elif element.supportsService( 'com.sun.star.text.TextTable' ): l + = self .__ParseTable(element, func) else : pass return l |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
def __ParseParagraph( self , paragraph, func): p = { 'paragraph' : []} l = 0 paragraph_it = paragraph.createEnumeration() while paragraph_it.hasMoreElements(): portion = paragraph_it.nextElement() if portion.TextPortionType = = 'Text' : l + = self .__ParsePortionText(portion, self .__Callback(p[ 'paragraph' ])) elif portion.TextPortionType = = 'SoftPageBreak' : pass elif portion.TextPortionType = = 'TextField' : l + = self .__ParsePortionText(portion, self .__Callback(p[ 'paragraph' ])) else : l + = self .__ParseTextContent(portion, self .__Callback(p[ 'paragraph' ])) if hasattr (paragraph, 'createContentEnumeration' ): l + = self .__ParseTextContent(paragraph, self .__Callback(p[ 'paragraph' ])) p[ 'length' ] = l func(p) return l def __ParseTextContent( self , textcontent, func): l = 0 content_it = textcontent.createContentEnumeration( 'com.sun.star.text.TextContent' ) while content_it.hasMoreElements(): element = content_it.nextElement() if element.supportsService( 'com.sun.star.text.TextGraphicObject' ): l + = self .__ParsePortionGraphic(element, func) elif element.supportsService( 'com.sun.star.text.TextEmbeddedObject' ): pass elif element.supportsService( 'com.sun.star.text.TextFrame' ): l + = self .__ParseFrame(element, func) elif element.supportsService( 'com.sun.star.drawing.GroupShape' ): l + = self .__ParseGroup(element, func) else : pass return l def __ParseFrame( self , frame, func): f = { 'frame' : []} l = self .__ParseText(frame.getText(), self .__Callback(f[ 'frame' ])) f[ 'length' ] = l func(f) return l def __ParseGroup( self , group, func): l = 0 for i in range (group.getCount()): it = group.getByIndex(i) if it.supportsService( 'com.sun.star.drawing.Text' ): l + = self .__ParseFrame(it, func) else : pass return l def __ParsePortionText( self , portion_text, func): func({ 'portion' : portion_text.String, 'length' : len (portion_text.String)}) return len (portion_text.String) def __ParsePortionGraphic( self , portion_graphic, func): gp = self .smgr.createInstanceWithContext( 'com.sun.star.graphic.GraphicProvider' , self .ctx) stream = self .smgr.createInstanceWithContext( 'com.sun.star.io.TempFile' , self .ctx) pv1 = PropertyValue() pv1.Name = 'OutputStream' pv1.Value = stream pv2 = PropertyValue() pv2.Name = 'MimeType' pv2.Value = 'image/png' gp.storeGraphic(portion_graphic.Graphic, (pv1, pv2)) stream.getOutputStream().flush() stream.seek( 0 ) l = stream.getInputStream().available() b = uno.ByteSequence(b'') stream.seek( 0 ) l, b = stream.getInputStream().readBytes(b, l) img = { 'image' : base64.b64encode(b.value).decode( 'ascii' )} img[ 'height' ] = portion_graphic.Height img[ 'width' ] = portion_graphic.Width img[ 'actualheight' ] = portion_graphic.ActualSize.Height img[ 'actualwidth' ] = portion_graphic.ActualSize.Width img[ 'croptop' ] = portion_graphic.GraphicCrop.Top img[ 'cropbottom' ] = portion_graphic.GraphicCrop.Bottom img[ 'cropleft' ] = portion_graphic.GraphicCrop.Left img[ 'cropright' ] = portion_graphic.GraphicCrop.Right img[ 'length' ] = 0 func(img) return 0 def __ParseTable( self , table, func): l = 0 try : matrix = self .__GetTableMatrix(table) seps = self .__GetTableSeparators(table) t = {} count = 0 for ri in matrix.keys(): t[ri] = {} for ci in matrix[ri].keys(): t[ri][ci] = dict (matrix[ri][ci]) del t[ri][ci][ 'cell' ] t[ri][ci][ 'content' ] = [] l + = self .__ParseText(matrix[ri][ci][ 'cell' ], self .__Callback(t[ri][ci][ 'content' ])) count + = t[ri][ci][ 'rowspan' ] * t[ri][ci][ 'colspan' ] if count ! = len (t) * len (seps): raise ValueError( 'count of cells error' ) func({ 'table' : t, 'row' : len (t), 'column' : len (seps), 'length' : l, 'tableid' : self .table_id}) self .table_id + = 1 except : l = 0 print ( 'discard wrong table' ) return l @staticmethod def __GetTableSeparators(table): result = [table.TableColumnRelativeSum] for ri in range (table.getRows().getCount()): result + = [s.Position for s in table.getRows().getByIndex(ri).TableColumnSeparators] result = sorted ( set (result)) for i in range ( len (result) - 1 ): result[i] + = 1 if result[i] + 1 = = result[i + 1 ] else 0 return sorted ( set (result)) @staticmethod def __NameToRC(name): r = int (re.sub( '[A-Za-z]' , '', name)) - 1 cstr = re.sub( '[0-9]' , '', name) c = 0 for i in range ( len (cstr)): if cstr[i] > = 'A' and cstr[i] < = 'Z' : c = c * 52 + ord (cstr[i]) - ord ( 'A' ) else : c = c * 52 + 26 + ord (cstr[i]) - ord ( 'a' ) return r, c @staticmethod def __GetTableMatrix(table): result = {} for name in table.getCellNames(): ri, ci = WordToJson.__NameToRC(name) cell = table.getCellByName(name) if ri not in result: result[ri] = {} result[ri][ci] = { 'cell' : cell, 'rowspan' : cell.RowSpan, 'name' : name} seps = WordToJson.__GetTableSeparators(table) for ri in result.keys(): sep = [s.Position for s in table.getRows().getByIndex(ri).TableColumnSeparators] + [table.TableColumnRelativeSum] sep = sorted ( set (sep)) for ci in result[ri].keys(): right = seps.index(sep[ci]) if sep[ci] in seps else seps.index(sep[ci] + 1 ) left = - 1 if ci = = 0 else seps.index(sep[ci - 1 ]) if sep[ci - 1 ] in seps else seps.index(sep[ci - 1 ] + 1 ) result[ri][ci][ 'colspan' ] = right - left return result |
5、写doc文档
1
2
3
4
|
self .doco = self .desktop.loadComponentFromURL( 'private:factory/swriter' , '_blank' , 0 , ()) self .texto = self .doco.getText() self .cursoro = self .texto.createTextCursor() self .cursoro.ParaBottomMargin = 500 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
def __WriteText( self , text, texto, cursoro): for it in text: if 'paragraph' in it: self .__WriteParagraph(it, texto, cursoro) elif 'image' in it: self .__WritePortionGraphic(it, texto, cursoro) elif 'table' in it: self .__WriteTable(it, texto, cursoro) def __WriteParagraph( self , paragraph, texto, cursoro): if paragraph[ 'length' ] > 0 : if 'result' in paragraph: for it in paragraph[ 'result' ]: texto.insertString(cursoro, it[ 'trans_sen' ], False ) else : texto.insertString(cursoro, paragraph[ 'paragraph' ], False ) texto.insertControlCharacter(cursoro, ControlCharacter.PARAGRAPH_BREAK, False ) def __WritePortionGraphic( self , portion_graphic, texto, cursoro): png_base64 = portion_graphic[ 'image' ] png = base64.b64decode(png_base64) gp = self .smgr.createInstanceWithContext( 'com.sun.star.graphic.GraphicProvider' , self .ctx) istream = self .smgr.createInstanceWithContext( 'com.sun.star.io.SequenceInputStream' , self .ctx) istream.initialize((uno.ByteSequence(png), )) pv = PropertyValue() pv.Name = 'InputStream' pv.Value = istream actualsize = uno.createUnoStruct( 'com.sun.star.awt.Size' ) actualsize.Height = portion_graphic[ 'actualheight' ] if 'actualheight' in portion_graphic else portion_graphic[ 'height' ] actualsize.Width = portion_graphic[ 'actualwidth' ] if 'actualwidth' in portion_graphic else portion_graphic[ 'width' ] graphiccrop = uno.createUnoStruct( 'com.sun.star.text.GraphicCrop' ) graphiccrop.Top = portion_graphic[ 'croptop' ] if 'croptop' in portion_graphic else 0 graphiccrop.Bottom = portion_graphic[ 'cropbottom' ] if 'cropbottom' in portion_graphic else 0 graphiccrop.Left = portion_graphic[ 'cropleft' ] if 'cropleft' in portion_graphic else 0 graphiccrop.Right = portion_graphic[ 'cropright' ] if 'cropright' in portion_graphic else 0 image = self .doco.createInstance( 'com.sun.star.text.TextGraphicObject' ) image.Surround = NONE image.Graphic = gp.queryGraphic((pv, )) image.Height = portion_graphic[ 'height' ] image.Width = portion_graphic[ 'width' ] image.setPropertyValue( 'ActualSize' , actualsize) image.setPropertyValue( 'GraphicCrop' , graphiccrop) texto.insertTextContent(cursoro, image, False ) texto.insertControlCharacter(cursoro, ControlCharacter.PARAGRAPH_BREAK, False ) def __WriteTable( self , table, texto, cursoro): tableo = self .doco.createInstance( 'com.sun.star.text.TextTable' ) tableo.initialize(table[ 'row' ], table[ 'column' ]) texto.insertTextContent(cursoro, tableo, False ) # texto.insertControlCharacter(cursoro, ControlCharacter.PARAGRAPH_BREAK, False) tcursoro = tableo.createCursorByCellName( "A1" ) hitbug = False if table[ 'row' ] > 1 : tcursoro.goDown( 1 , True ) hitbug = tcursoro.getRangeName() = = 'A1' for ri in sorted ([ int (r) for r in table[ 'table' ].keys()]): rs = table[ 'table' ][ str (ri)] for ci in sorted ([ int (c) for c in rs.keys()]): cell = rs[ str (ci)] if hitbug = = False and (cell[ 'rowspan' ] > 1 or cell[ 'colspan' ] > 1 ): tcursoro.gotoCellByName(cell[ 'name' ], False ) if cell[ 'rowspan' ] > 1 : tcursoro.goDown(cell[ 'rowspan' ] - 1 , True ) if cell[ 'colspan' ] > 1 : tcursoro.goRight(cell[ 'colspan' ] - 1 , True ) tcursoro.mergeRange() ctexto = tableo.getCellByName(cell[ 'name' ]) if ctexto = = None : continue ccursoro = ctexto.createTextCursor() ccursoro.CharWeight = FontWeight.NORMAL ccursoro.CharWeightAsian = FontWeight.NORMAL ccursoro.ParaAdjust = LEFT self .__WriteText(cell[ 'content' ], ctexto, ccursoro) |
6、生成二进制的doc文档数据
1
2
3
4
|
streamo = self .smgr.createInstanceWithContext( 'com.sun.star.io.Pipe' , self .ctx) self .doco.storeToURL( 'private:stream' , (PropertyValue( 'FilterName' , 0 , 'MS Word 2007 XML' , 0 ), PropertyValue( 'OutputStream' , 0 , streamo, 0 ))) streamo.flush() _, datao = streamo.readBytes( None , streamo.available()) |
7、从doc文档数据生成pdf的二进制数据
1
2
3
4
|
streamo = self .smgr.createInstanceWithContext( 'com.sun.star.io.Pipe' , self .ctx) self .doco.storeToURL( 'private:stream' , (PropertyValue( 'FilterName' , 0 , 'writer_pdf_Export' , 0 ), PropertyValue( 'OutputStream' , 0 , streamo, 0 ))) streamo.flush() _, datap = streamo.readBytes( None , streamo.available()) |
8、读取excel二进制数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
def ImportFromMemory( self , data): istream = self .smgr.createInstanceWithContext( 'com.sun.star.io.SequenceInputStream' , self .ctx) istream.initialize((uno.ByteSequence(data), )) pv = PropertyValue() pv.Name = 'InputStream' pv.Value = istream self .doc = { 'doc' : []} try : print ( "before loadComponentFromURL" ) self .document = self .desktop.loadComponentFromURL( 'private:stream/scalc' , '_blank' , 0 , (pv, )) self .sheets = self .document.getSheets() print ( "ImportFromMemory done" ) except : print ( "ImportFromMemory failed" ) self .sheets = None |
9、读取excel的文本数据
1
2
3
4
5
6
7
|
def ExportToJson( self ): try : l = self .__ParseText( self .sheets, self .__Callback( self .doc[ 'doc' ])) self .doc[ 'length' ] = l except : self .doc = { 'doc' : [], 'length' : 0 } return json.dumps( self .doc) |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
def __ParseText( self , sheets, func): l = 0 sheets_it = sheets.createEnumeration() while sheets_it.hasMoreElements(): element = sheets_it.nextElement() if element.supportsService( 'com.sun.star.sheet.Spreadsheet' ): l + = self .__ParseSpreadsheet(element, func) return l def __ParseSpreadsheet( self , spreadsheet, func): l = 0 p = { 'spreadsheet' : []} visible_cells_it = spreadsheet.queryVisibleCells().getCells().createEnumeration() while visible_cells_it.hasMoreElements(): cell = visible_cells_it.nextElement() type = cell.getType() if type = = self .EMPTY: print ( "cell.type==empty" ) elif type = = self .VALUE: print ( "cell.type==VALUE" , "value=" , cell.getValue(), cell.getCellAddress ()) elif type = = self .TEXT: print ( "cell.type==TEXT" , "content=" , cell.getString().encode( "UTF-8" ), cell.getCellAddress ()) l + = self .__ParseCellText(spreadsheet, cell, self .__Callback(p[ 'spreadsheet' ])) print ( "__ParseCellText=" , p) elif type = = self .FORMULA: print ( "cell.type==FORMULA" , "formula=" , cell.getValue()) p[ 'length' ] = l func(p) return l def __ParseCellText( self , sheet, cell, func): try : x = cell.getCellAddress().Column y = cell.getCellAddress().Row sheetname = sheet.getName() except : x = - 1 y = - 1 sheetname = None func({ 'celltext' : cell.getString(), 'x' : x, 'y' : y, 'sheetname' : sheetname, 'length' : len (cell.getString())}) return len (cell.getString()) |
1
2
3
4
|
self .EMPTY = uno.Enum( "com.sun.star.table.CellContentType" , "EMPTY" ) self .TEXT = uno.Enum( "com.sun.star.table.CellContentType" , "TEXT" ) self .FORMULA = uno.Enum( "com.sun.star.table.CellContentType" , "FORMULA" ) self .VALUE = uno.Enum( "com.sun.star.table.CellContentType" , "VALUE" ) |
10、替换excel的文本信息
1
2
3
4
5
6
|
def ImportFromJson( self , data): doc = json.loads(data) try : self .__WriteText(doc[ 'doc' ]) except : pass |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
def __WriteText( self , text): print ( "__WriteText begin:" , text) sheet = None for it in text: if 'paragraph' in it and 'sheetname' in it: if sheet = = None or sheet.getName() ! = it[ 'sheetname' ]: try : sheet = self .sheets.getByName(it[ 'sheetname' ]) print ( "getsheet:" , it[ 'sheetname' ], "=" , sheet.getName()) except : sheet = None continue self .__WriteParagraph(it, sheet) def __WriteParagraph( self , paragraph, sheet): print ( "__WriteParagraph" ) if paragraph[ 'length' ] > 0 : try : x = paragraph[ 'x' ] y = paragraph[ 'y' ] print ( "getcell:" , x, y) cell = sheet.getCellByPosition(x, y) print ( "getcell done" ) except : return if 'result' in paragraph: for it in paragraph[ 'result' ]: print ( "cell=" , cell.getString()) cell.setString(it[ 'trans_sen' ]) print ( "cell," , cell.getString(), ",done" ) |
11、生成excel文档二进制数据
1
2
3
4
|
streamo = self .smgr.createInstanceWithContext( 'com.sun.star.io.Pipe' , self .ctx) self .document.storeToURL( 'private:stream' , (PropertyValue( 'FilterName' , 0 , 'Calc MS Excel 2007 XML' , 0 ), PropertyValue( 'OutputStream' , 0 , streamo, 0 ))) streamo.flush() _, datao = streamo.readBytes( None , streamo.available()) |
12、生成excel的pdf文档
1
2
3
4
|
streamo = self .smgr.createInstanceWithContext( 'com.sun.star.io.Pipe' , self .ctx) self .document.storeToURL( 'private:stream' , (PropertyValue( 'FilterName' , 0 , 'calc_pdf_Export' , 0 ), PropertyValue( 'OutputStream' , 0 , streamo, 0 ))) streamo.flush() _, datap = streamo.readBytes( None , streamo.available()) |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.cnblogs.com/zl1991/p/10615881.html